TI: Konnektormonitoring mit CheckMK

Mit dem folgenden Skript ist es möglich, Konnektoren via SOAP Aufruf mit CheckMK zu überwachen. Das Programm nutzt die SOAP Schnittstelle des Konnektors, ist demnach mit jedem Konnektor kompatibel und läuft auch bei Managed TI / Tiaas Anbietern.

 

Konvertierung von Clientsystemzertifikaten im p12 Container

openssl pkcs12 -in certificate.p12 -out certificate.pem -nodes -legacy

 

Support

Sie können kommerziellen Support für dieses Programm für 250 € netto einmalig erwerben. Hiermit unterstützen Sie ebenfalls die Entwicklung zukünftiger, weiterer Tools dieser Art. Support beinhaltet:

  • Beratung
  • die zur Verfügungstellung von Updates
  • Unterstützung bei der Konvertierung der Zertifikate
  • Unterstützung bei der Einrichtung auf einem Server
  • gesetzliche Gewährleistung

Eine Gewährleistung für das Programm, seine Lauffähigkeit oder die Haftung für sich hieraus ergebener Probleme wird ohne Support nicht gegeben.

Haben Sie Bedarf am Troubleshooting ihrer TI Anwendung? Sie können meine Leistung auch auf Tagessatzbasis buchen.

 

Programm

Konnektor_Check.py

#!/usr/bin/env python3
import requests, xml.dom.minidom, ssl, sys, re, argparse, urllib3, subprocess, os
from base64 import b64encode
from socket import timeout
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 
soapBody = """
 <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v7="http://ws.gematik.de/conn/EventService/v7.2" xmlns:v2="http://ws.gematik.de/conn/ConnectorContext/v2.0" xmlns:v5="http://ws.gematik.de/conn/ConnectorCommon/v5.0" xmlns:v21="http://ws.gematik.de/conn/CardServiceCommon/v2.0">
 <soapenv:Header/>
   <soapenv:Body>
     <v7:GetResourceInformation>
     <v2:Context>
         <v5:MandantId>%s</v5:MandantId>
         <v5:ClientSystemId>%s</v5:ClientSystemId>
         <v5:WorkplaceId>%s</v5:WorkplaceId>
     </v2:Context>
     </v7:GetResourceInformation>
 </soapenv:Body>
 </soapenv:Envelope>
"""
 
class Koco:
   def __init__(self, host, Mandant, ClientSystem, Arbeitsplatz, authMethod):
      self.host = host
      self.Mandant = Mandant
      self.ClientSystem = ClientSystem
      self.Arbeitsplatz = Arbeitsplatz
      self.headers = {}
      self.Message = ''
      self.Sds = ''
      self.Soap = ''
      self.session = requests.session()
      self.session.trust_env = False
      # Set Auth Parameter
      self.URL = f'https://{host}:443'
      self.cert = None
      self.Auth = None
      if authMethod[0] == 'http':
         self.URL = f'http://{host}:80'
      elif authMethod[0] == "cert":
         self.cert = authMethod[1]
      elif authMethod[0] == "auth":
         self.Auth = f'{authMethod[1]}:{authMethod[2]}'
         self.Auth = "'Basic ' + str(b64encode(self.Auth.encode()))"
 
   def kocoTalk(self, Service, data=None):
      try:
         if self.Auth: self.headers.update({"Authorization": self.Auth})
         self.response =\
            self.session.post(f'{self.URL}{Service}', timeout=6, headers=self.headers, verify=False, cert=self.cert, data=data) if data else\
            self.session.get(f'{self.URL}{Service}', timeout=6, headers=self.headers, verify=False, cert=self.cert)
         assert self.response.status_code == 200
         return True
      except requests.exceptions.Timeout:
         self.Message += "Konnektor Timeout"
      except:
         self.Message += "Ein Fehler ist aufgetreten."
      return False
 
   def ping(self, host):
      p = subprocess.Popen(['ping', host,'-4', '-c','1',"-W","2"],stdout=subprocess.DEVNULL)
      p.wait()
      return not p.poll()
 
   def sdsCheck(self):
      if not self.kocoTalk('/connector.sds'):
         return False
      mo = re.search('<FWVersion>(.*)</FWVersion>', str(self.response.text))
      self.Sds = f'Firmware: {mo.group(1)}'
      return True
 
   def soapCheck(self):
      data = soapBody% (self.Mandant, self.ClientSystem, self.Arbeitsplatz)
      self.headers = {
         "Content-Type": "text/xml; charset=""utf-8""",
         "Content-Length": str(len(data)),
         "SOAPAction": "http://ws.gematik.de/conn/EventService/v7.2#GetResourceInformation"
         }
      if not self.kocoTalk('/service/systeminformationservice', data):
         return False
      node = xml.dom.minidom.parseString(self.response.text).childNodes[0].childNodes[0].childNodes[0].childNodes[1].childNodes[2]
      self.Soap = ''
      for e in node.childNodes:
         for n in e.childNodes:
            prefix, tag = xml.dom.minidom._nssplit(n.nodeName)
            if tag == 'Value':
               val = n.firstChild.data
            elif tag == 'ErrorCondition':
               ec = n.firstChild.data
            elif tag == 'Severity':
               sev = n.firstChild.data
            elif tag == "Type":
               type = n.firstChild.data
            elif tag == "ValidFrom":
               start = n.firstChild.data.split('.')[0]
         if val == 'true':
            self.Soap += (f' {ec}')
      return True
 
if __name__ == "__main__":
   p = argparse.ArgumentParser(description='minimalKonCheck')
   p.add_argument('-i', '--ip', metavar=("<IP>"), required=True, help='IP Address')
   p.add_argument('-m', '--mandant', required=True, metavar=("<Mandant>"))
   p.add_argument('-c', '--clientsystem', required=True, metavar=("<Clientsystem>"))
   p.add_argument('-a', '--arbeitsplatz', required=True, metavar=("<Arbeitsplatz>"))
   p.add_argument('-s', '--authmethod', default="https", nargs='+', metavar=("<Auth Mechanism> [FileName] or <Auth Mechanism [<User> <Pass>]"), help='Auth-Mechanism: http, https, auth, cert')
   a = p.parse_args()
 
   K = Koco(a.ip, a.mandant, a.clientsystem, a.arbeitsplatz, a.authmethod)
   pingMSG = '' if K.ping(a.ip) else 'nicht '
   MSG = ''
   try:
      if not K.sdsCheck() or not K.soapCheck():
         raise error
      print(f'0 Konnektor_{a.ip} - {K.Sds}, {K.Soap} {MSG}')
   except:
      MSG = f'{K.Message}, Ping {pingMSG}erfolgreich'
      print(f'2 Konnektor_{a.ip} - {MSG}')

Das Programm steht unter der BSD Lizenz.

 

Aufruf

Das Programm muss mit passenden Parametern aufgerufen werden. Hierzu gehört die IP Adresse, Infomodellparameter und ggf. ein TLS Zertifikat im PEM Format.

Das ganze packt man daher am besten in ein Shell Skript.

#!/bin/bash
/usr/bin/Konnektor_Check.py -i 172.18.2.1 -m checkmk -c checkmk -a checkmk -s cert ./checkmk.pem