TI: SOAP Abfrage der aktuellen Konnektor Subscriptions

Primärsysteme verhalten sich nicht so wie erwartet, wenn es um die Kommunikation mit dem Konnektor geht? Da hilft nur ein Blick in die aktuelle Subscription Tabelle des Konnektors.

Ist das entsprechende Primärsystem oder Kartenlesegerät überhaupt noch am Konnektor registriert, d.h. verfügt das jeweilige Gerät über eine gültige Subscription?

Mit diesem Python Tool können sie das Verhalten der Subscriptions für den aktuellen Moment auslesen. Das Programm verfügt über Filter-Funktionen, um einzelne Subscriptions zu überwachen. Mit einer Bash-Schleife können Sie die Tabelle auch alle x Minuten in eine Datei schreiben und gleichzeitig ein TCPdump mitschneiden. Sobald eine Subscription verschwindet, können sie anhand des Zeitstempels prüfen, was zum jeweiligen Zeitpunkt im TCPdump zu sehen ist.

Tipp: Meist sind es Firewallregeln, die die Verbindung getrennt haben, sodass der Konnektor die Verbindung noch für geöffnet hält, die Datenpakete jedoch nicht mehr ihr Ziel erreichen.

Das Programm nutzt die SOAP Schnittstelle des Konnektors, ist demnach mit jedem Konnektor kompatibel und läuft auch bei Managed TI / Tiaas Anbietern.

 

Konvertierung von Clientsystemzertifikaten im p12 Container

openssl pkcs12 -in certificate.p12 -out certificate.pem -nodes -legacy

 

Support

Sie können kommerziellen Support für dieses Programm für 250 € netto einmalig erwerben. Hiermit unterstützen Sie ebenfalls die Entwicklung zukünftiger, weiterer Tools dieser Art. Support beinhaltet:

  • Beratung
  • eine kompilierte .exe Datei für Windows
  • die zur Verfügungstellung von Updates
  • Unterstützung bei der Konvertierung der Zertifikate
  • Unterstützung bei der Einrichtung auf einem Arbeitsplatz
  • gesetzliche Gewährleistung

Eine Gewährleistung für das Programm, seine Lauffähigkeit oder die Haftung für sich hieraus ergebener Probleme wird ohne Support nicht gegeben.

Haben Sie Bedarf am Troubleshooting ihrer TI Anwendung? Sie können meine Leistung auch auf Tagessatzbasis buchen.

 

Programm

getSubscription.py

#!/usr/bin/env python3
#2024-11-11 by Christian Krause
import requests, xml.dom.minidom, ssl, sys, re, argparse, urllib3, threading
from socket import timeout
from datetime import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

################## Base Functions #########################
def main(host, a):
   K = Kon(host, a.mandant, a.clientsystem, a.arbeitsplatz, a.authMethod, a.proxy, a.mandantwide)
   K.getSubscription(a.filter, a.write)

def parseArgs():
   p = argparse.ArgumentParser(description='Konnektor2CheckMK')
   p.add_argument('-v', '--version', action='version', version='%(prog)s 1.0 mit API-Version: 1.1')
   p.add_argument('-d', '--debug', action='store_true', help='No Sanity-Check. Just do it!')
   p.add_argument('-p', '--proxy', action='store_true', help='Do not ignore system Proxy')
   p.add_argument('-i', '--ip', dest='ipArray', nargs='+', metavar=("<IP/IP-Range/IP-Ranges>"), required=True, help='IP or IP-Range of Konnektor: 10.1.1.17 or 10.1.1.20-35')
   p.add_argument('-m', '--mandant', required=True, metavar=("<Mandant>"))
   p.add_argument('-c', '--clientsystem', required=True, metavar=("<Clientsystem>"))
   p.add_argument('-a', '--arbeitsplatz', required=True, metavar=("<Arbeitsplatz>"))
   p.add_argument('-w', '--mandantwide', action='store_true', help='Set Mandantwide Option')
   p.add_argument('-s', '--authmethod', dest='authMethod', default="https", nargs='+', metavar=("<Auth Mechanism> [FileName] or <Auth Mechanism [<User> <Pass>]"), help='Auth-Mechanism: http, https, auth, cert')
   p.add_argument('-f', '--filter', metavar=("<String>"), help='Filter for special String')
   p.add_argument('-o', '--write', action='store_true', help='Write output to disk <ip>.log')
   return p.parse_args()

def initializeDicts():
   # {"inputVar": ["Fehlermeldung", "Prüf-Regex"]}
   global sanitizingDict
   sanitizingDict = {
      "ipArray": ["Ungültige IP-Adresse/IPRange! Range z.B. 192.168.0.17-24", "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(-25[0-5]|-2[0-4][0-9]|-1[0-9][0-9]|-[1-9]?[0-9])?$"],
      "mandant": ["Mandant: Ungültige Zeichen oder Zeichenkette länger als 64 Zeichen! Gültige Zeichen: A-Z a-z 0-9 öÖäÄüÜß @#_-§%+:=.", "^[a-zA-Z0-9öÖäÄüÜß@#_\-§%+:=\.]{1,64}$"],
      "clientsystem": ["ClientSystem: Ungültige Zeichen oder Zeichenkette länger als 64 Zeichen! Gültige Zeichen: A-Z a-z 0-9 öÖäÄüÜß @#_-§%+:=.", "^[a-zA-Z0-9öÖäÄüÜß@#_\-§%+:=\.]{1,64}$"],
      "arbeitsplatz": ["Arbeitsplatz: Ungültige Zeichen oder Zeichenkette länger als 64 Zeichen! Gültige Zeichen: A-Z a-z 0-9 öÖäÄüÜß @#_-§%+:=.", "^[a-zA-Z0-9öÖäÄüÜß@#_\-§%+:=\.]{1,64}$"],
      "authMethod": ["", "^(http|https|cert|auth)"],
    }

   global soapBody
   soapBody =  """
        <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v7="http://ws.gematik.de/conn/EventService/v7.2" xmlns:v2="http://ws.gematik.de/conn/ConnectorContext/v2.0" xmlns:v5="http://ws.gematik.de/conn/ConnectorCommon/v5.0">
        <soapenv:Header/>
        <soapenv:Body>
            <v7:GetSubscription mandant-wide="%s">
            <v2:Context>
                <v5:MandantId>%s</v5:MandantId>
                <v5:ClientSystemId>%s</v5:ClientSystemId>
                <v5:WorkplaceId>%s</v5:WorkplaceId>
            </v2:Context>
            </v7:GetSubscription>
        </soapenv:Body>
        </soapenv:Envelope>
    """ 

def ipRange(ipRanges):
   ipArray = []
   for ipRange in ipRanges:
      if re.search('-', ipRange):
         fromIP, toOctet = ipRange.rsplit("-", 1)
         ipBase, fromOctet = fromIP.rsplit(".", 1)
         if int(toOctet) <= int(fromOctet):
            print('"Bis"-Angabe im Konnektor-Range muss größer sein als "Von"-Angabe!')
            sys.exit()
         ipArray += [ipBase + "." + str(i) for i in range(int(fromOctet), int(toOctet) + 1, 1)]
      else:
         ipArray.append(ipRange)
   return ipArray

def Sanitizing():
   for inputVar in vars(args):
      if not (inputValue := getattr(args, inputVar)):
         continue
      elif not type(inputValue) == list:
         inputValue = [inputValue]
      # Prüfe Regex für authMethod Parameter
      if inputVar == "authMethod":
         message, regex = sanitizingDict.get(inputVar)
         if not (re.search(regex, inputValue[0])):
            print("Ungültige Authentifizierungsmethode: http, https, auth, cert")
            return False
         if inputValue[0] == "auth" and not len(inputValue) == 3:
            print("Bitte Benutzername und Kennwort als Parameter eingeben!")
            return False
         elif inputValue[0] == "cert" and not len(inputValue) == 2:
            print("Bitte Pfad der Zertifikatsdatei angeben (Dateiname: globalcert.pem und globalcert.txt)")
            return False
      # Prüfe Regex für alles andere
      elif inputVar in sanitizingDict.keys():
         message, regex = sanitizingDict.get(inputVar)
         for inputString in inputValue:
            if not (re.search(regex, inputString)):
               print(message)
               return False
   return True

################## Kon Class #########################
class Kon:
   def __init__(self, host, Mandant, ClientSystem, Arbeitsplatz, authMethod, Proxy, MandantWide):
      self.host = host
      self.Mandant = Mandant
      self.MandantWide = "true" if MandantWide else "false"
      self.ClientSystem = ClientSystem
      self.Arbeitsplatz = Arbeitsplatz
      self.headers = {}
      self.session = requests.session()
      self.session.trust_env = Proxy
      # Set Auth Parameter
      self.URL = f'https://{host}:443'
      self.cert = None
      self.Auth = None
      if authMethod[0] == 'http':
         self.URL = f'http://{host}:80'
      elif authMethod[0] == "cert":
         self.cert = authMethod[1]
      elif authMethod[0] == "auth":
         self.Auth = f'{authMethod[1]}:{authMethod[2]}'
         self.Auth = "'Basic ' + str(b64encode(self.Auth.encode()))"

   def konComm(self, Service, data=None):
      Message = ""
      try:
         if self.Auth: self.headers.update({"Authorization": self.Auth})
         self.response =\
            self.session.post(f'{self.URL}{Service}', timeout=6, headers=self.headers, verify=False, cert=self.cert, data=data) if data else\
            self.session.get(f'{self.URL}{Service}', timeout=6, headers=self.headers, verify=False, cert=self.cert)
         assert self.response.status_code == 200
         return True
      except requests.exceptions.Timeout:
         Message = "Konnektor Timeout"
      except IOError:
         Message = "Es ist ein Kommunikationsfehler mit dem Konnektor aufgetreten. Fehlendes Zertifikat oder TLS Verbindung fehlgeschlagen."
      except AssertionError:
         status_code = str(self.response.status_code)
         Message = f' HTTP Status Code: {status_code}'
      except:
         Message = "Ein undefinierter Fehler ist aufgetreten."
      if Message: print(Message)
      return False

   def getSubscription(self, filter, write):
      data = soapBody% (self.MandantWide, self.Mandant, self.ClientSystem, self.Arbeitsplatz)
      self.headers = {
         "Content-Type": "text/xml; charset=""utf-8""",
         "Content-Length": str(len(data)),
         "SOAPAction": "http://ws.gematik.de/conn/EventService/v7.2#GetSubscription"
         }
      if not self.konComm('/service/systeminformationservice', data):
         return False
      dom = xml.dom.minidom.parseString(self.response.text)
      if not filter:
         if write: self.writeFile(dom.toprettyxml())
         print(dom.toprettyxml())
         return True
      node = dom.childNodes[0].childNodes[0].childNodes[0].childNodes[1]
      self.Soap = ''
      for e in node.childNodes:
         Found = False
         # Find filter
         for n in e.childNodes:
            prefix, tag = xml.dom.minidom._nssplit(n.nodeName)
            if n.firstChild.data == filter:
               Found = True
         # Print if found
         if Found:
            msg = ""
            for n in e.childNodes:
               prefix, tag = xml.dom.minidom._nssplit(n.nodeName)
               if write: 
                  msg = '%s %s'%(msg, f'{tag}: {n.firstChild.data}\n')
               print(f'{tag}: {n.firstChild.data}')
            if write: self.writeFile(msg)
      return True
   
   def writeFile(self, Message):
      with open(f'{self.host}.log', "a") as f:
         Date = datetime.now()
         isodate = Date.isoformat().rsplit(':', 1)[0]
         f.write(f'==================== {isodate} ====================\n')
         f.write(f'{Message}\n')
   

################# Main Function ####################
if __name__ == "__main__":
   initializeDicts()
   args = parseArgs()
   if args.debug or Sanitizing():
      for host in ipRange(args.ipArray):
         threading.Thread(target=main, args=(host, args)).start()

Das Programm steht unter der BSD Lizenz.

 

Aufruf

$ ./getSubscription.py -i 172.18.2.1 -m 20240916_M -c 20240916_C -a 20240916_A1 -s cert ~/20240916_C.pem