Starface Adressbuch: SQL / CardDav Import via REST API

Zur Synchronisation eines Starface Adressbuches mit einem CRM System ist ein Zugriff auf die SQL Datenbank des CRM erforderlich.

Hierzu gibt es kostenpflichtige Module, die einige Nachteile mit sich bringen:

  • Sie werden i.d.R. mit jedem großen Starface Update zerschossen und erfordern manuelle Nacharbeiten bei jedem größeren Starface Update
  • Der Preis: hohe, monatliche Beträge, die mit der Anzahl der Kontakte skalieren sind genauso vertreten wie Module mit dreistelligem Einmalbetrag zzgl. jährlicher Wartungsgebühren.
  • Sie funktionieren nicht mit einer kostenfreien Starface Free Lizenz

Das folgende Python Skript übernimmt die Synchronisation 'von extern', d.h. Sie läuft auf einem (Linux-) Server mit Zugriff auf die SQL Datenbank und auf die Telefonanlage. Das funktioniert i.d.R. sehr lange Updatestabil.

Hinweise:

  • Die Synchronisation funktioniert ebenfalls mit Carddav-Quellen wie Nextcloud/Owncload oder einem Synology NAS.
  • Der Carddav Adressbuch-Link muss der vollständige Link zum Adressbuch sein, nicht nur der zur Haupt-URL der Nextcloud Instanz wie im Quellcode ggf. suggeriert wird. Ggf. schauen Sie in ihrem Android Handy mit DAVx5 nach, dort steht der genaue Link.
  • Jetzt haben Sie schon so weit gelesen, aber: Ggf. müssen Sie ihre Kontakte gar nicht in die Starface importieren. Sie könnten z.B. ein LDAP Directory betreiben, in welchem sie ihre Kontakte ablegen
  • Sie können das Programm auch unter Windows laufen lassen, benötigen dann jedoch eine Python Laufzeitumgebung.
  • Ich biete das Skript für "Techies" hier als Ideengeber kostenfrei und ohne Gewährleistung oder Haftung zum Download. Sofern Sie an Support interessiert sind, können Sie ein Support Paket erwerben. Dieses beinhaltet Beratung, Einrichtung und übernahme einer Gewährleistung. Der Kostenpunkt liegt bei 400 € netto - ohne Folgekosten.

 

Um das Skript auf einer Starface Telefonanlage zu installieren wird zuerst eine SSH Verbindung auf die Starface hergestellt und dann die folgenden Pakete installiert:

dnf install python3-pip
pip3 install --upgrade pip
pip3 install vobject requests lxml
touch starface_addr.py
chmod +x starface_addr.py

# Nun das starface_addr.py Skript einfügen
#STRG+INS zum einfügen der Zwischenablage, STRG + C im Anschluss zum beenden. Letzte Zeile muss eine leere Zeile sein
cat > starface_addr.py


 # Nur für SQL: Neuere Versionen machen Probleme, daher die alte Version installieren
pip install pymssql==2.1.4


# Nur für VCard
pip install vobject 
# Im Anschluss das Carddav Skript mit dem unten stehenden Befehl einfügen
#STRG+INS zum einfügen der Zwischenablage, STRG + C im Anschluss zum beenden. Letzte Zeile muss eine leere Zeile sein
cat > carddav.py

 

Der Aufruf des Skriptes erfolgt mit Angabe von Input- und Output-Methode. Soll von SQL nach Starface übertragen werden, entsprechend:

 ./starface_addr.py -i sql -o starface

Um Daten von Carddav zu holen und testweise auf der Kommandozeile auszugeben wäre der Aufruf:

 ./starface_addr.py -i carddav -o print

Regelmäßiges Ausführen mit Cron via 'crontab -e'. Folgende Zeile einfügen für die tägliche Synchronisation um 07:00 Uhr morgens von Carddav nach Starface einfügen. An eigene Bedürfnisse dann individuell anpassen:

0 7 * * * PATH="$PATH:/usr/sbin:/usr/local/bin/" /root/starface_addr.py -i carddav -o starface > /dev/null

 

Hier das Skript:

#!/usr/bin/env python3
# Version 2.0 (Integrated CardDAV + Optimized Bulk Sync)
# Christian Krause

import io, csv, requests, json, sys, hashlib, re, argparse, configparser, urllib3
import lxml.etree as ET
from urllib.parse import urlparse

# ==========================================
# Konfiguration
# ==========================================

# SQL Server
sqlServer = '192.168.178.250:49761'
sqlDB = 'repdoc'
sqlUser = 'sa'
sqlPass = 'XXX'
sqlTableName = 'Kunden'

# Carddav (Nextcloud, Owncloud)
cdavUrl = 'https://nextcloud.xxxx.de'
cdavUser = 'Christian'
cdavPass = 'XXX'
cdavAuth = 'basic'
cdavVerify = True

# Starface
sfProto = 'https'
sfServer = '192.168.178.15'
sfUser = '0001'
sfPass = 'XXX'

# Default Vorwahl
country = '+49'
city = '2133'

# Spaltenzuweisung
sqlTables = {
    "Vorname":  ["Vorname [contact:firstname]"  , False, '0', "Name2"],
    "Nachname": ["Name [contact:familyname]"    , False, '1', "Name"],
    "Firma":    ["Firma [contact:company]"      , False, '2', ""],
    "Telefon":  ["Rufnummer [telephone:phone]"  , True , '3', "Telefon"],
    "Telefon2": ["Privat [telephone:homephone]" , True , '4', "ZusatzTel"],
    "Mobil":    ["Mobil [telephone:mobile]"     , True , '5', "Mobil"],
    "Fax":      ["Fax [telephone:fax]"          , True , '6', "Telefax"],
    "eMail":    ["E-Mail [email:e-mail]"        , False, '7', ""],
    "PLZ":      ["PLZ [address:postcode]"       , False, '8', ""],
    "Stadt":    ["Stadt [address:city]"         , False, '9', ""],
    "Strasse":  ["Straße [address:street]"      , False, '10', ""],
}

# Regex Vorkompilierung
RE_00 = re.compile(r'^00')
RE_0 = re.compile(r'^0([1-9])')
RE_NON_ZERO = re.compile(r'^([1-9])')
RE_SPACES = re.compile(r' +')

# ==========================================
# Integrierter CardDAV Reader (Bulk-Optimiert)
# ==========================================

class CardDAVReader:
    def __init__(self, resource, user='', passwd='', verify=True, auth='basic'):
        if not verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        split_url = urlparse(resource)
        self.url_resource = resource
        self.url_base = f"{split_url.scheme}://{split_url.netloc}"

        self.session = requests.session()
        self._settings = {'verify': verify}
        
        if auth == 'basic':
            self._settings['auth'] = (user, passwd)
        elif auth == 'digest':
            from requests.auth import HTTPDigestAuth
            self._settings['auth'] = HTTPDigestAuth(user, passwd)
            
        self.headers = {"User-Agent": "starface-sync/2.0"}
        
        # Validierung des Endpunkts
        resp = self.session.request('PROPFIND', self.url_resource, headers=self.headers, **self._settings)
        resp.raise_for_status()

    def get_abook(self):
        headers = dict(self.headers)
        headers['Depth'] = '1'
        resp = self.session.request('PROPFIND', self.url_resource, headers=headers, **self._settings)
        resp.raise_for_status()
        
        namespace = "{DAV:}"
        try:
            element = ET.fromstring(resp.content)
        except ET.XMLSyntaxError:
            return {}

        abook = {}
        for response in element.findall(f"{namespace}response"):
            href_elem = response.find(f"{namespace}href")
            if href_elem is None or not href_elem.text: continue
                
            href = href_elem.text
            propstat = response.find(f"{namespace}propstat")
            if propstat is not None:
                prop = propstat.find(f"{namespace}prop")
                if prop is not None:
                    getcontenttype = prop.find(f"{namespace}getcontenttype")
                    if getcontenttype is not None and getcontenttype.text and "vcard" in getcontenttype.text.lower():
                        getetag = prop.find(f"{namespace}getetag")
                        abook[href] = getetag.text if (getetag is not None and getetag.text) else ""
        return abook

    def get_multiple_vcards(self, hrefs, chunk_size=100):
        if not hrefs: return {}
        vcards = {}

        for i in range(0, len(hrefs), chunk_size):
            chunk = hrefs[i:i + chunk_size]
            
            # XML für Nextcloud (addressbook-multiget mit <allprop/>)
            nsmap = {'d': 'DAV:', 'c': 'urn:ietf:params:xml:ns:carddav'}
            root = ET.Element(f"{{urn:ietf:params:xml:ns:carddav}}addressbook-multiget", nsmap=nsmap)
            
            prop = ET.SubElement(root, f"{{DAV:}}prop")
            ET.SubElement(prop, f"{{DAV:}}getetag")
            addr_data = ET.SubElement(prop, f"{{urn:ietf:params:xml:ns:carddav}}address-data")
            ET.SubElement(addr_data, f"{{urn:ietf:params:xml:ns:carddav}}allprop")
            
            for href in chunk:
                href_el = ET.SubElement(root, f"{{DAV:}}href")
                href_el.text = href

            payload = ET.tostring(root, encoding='utf-8', xml_declaration=True)
            
            headers = dict(self.headers)
            headers['Depth'] = '1'
            headers['Content-Type'] = 'application/xml; charset=utf-8'

            resp = self.session.request('REPORT', self.url_resource, data=payload, headers=headers, **self._settings)
            resp.raise_for_status()

            element = ET.fromstring(resp.content)
            for response in element.findall(f"{{DAV:}}response"):
                href_elem = response.find(f"{{DAV:}}href")
                if href_elem is None or not href_elem.text: continue
                
                propstat = response.find(f"{{DAV:}}propstat")
                if propstat is not None:
                    prop = propstat.find(f"{{DAV:}}prop")
                    if prop is not None:
                        addr_data_elem = prop.find(f"{{urn:ietf:params:xml:ns:carddav}}address-data")
                        if addr_data_elem is not None and addr_data_elem.text:
                            vcards[href_elem.text] = addr_data_elem.text.encode('utf-8')
        return vcards

# ==========================================
# Main Logic
# ==========================================

def parseArgs():
    c = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
    c.add_argument("-C", "--config", help="Specify config file", metavar="config")
    defaults = { }
    args, remaining_argv = c.parse_known_args()

    if args.config:
        config = configparser.ConfigParser()
        config.read([args.config])
        defaults.update(dict(config.items("defaults")))

    p = argparse.ArgumentParser(parents=[c], description='SQL/CardDAV to Starface Sync Tool')
    p.set_defaults(**defaults)
    p.add_argument('-v', '--version', action='version', version='%(prog)s 1.6.1')
    p.add_argument('-i', '--input', choices=['sql', 'carddav'], required=True, help='Input Source: sql or carddav')
    p.add_argument('-o', '--output', choices=['print', 'csv', 'starface'], required=True, help='Output Source')

    return p.parse_args(remaining_argv)

def clean_number(number):
    number = ''.join(c for c in number if c.isdigit() or c == '+')
    if len(number) < 4: return ''
    number = RE_00.sub('+', number)
    number = RE_0.sub(country + r'\1', number)
    return RE_NON_ZERO.sub(country + city + r'\1', number)


def read_sql(sqlServer, sqlUser, sqlPass, sqlDB):
    csvHeader = []
    queryIsNumberList = []
    queryTableList = []

    for tableColoum in sqlTables:
        csvHeader.append(sqlTables.get(tableColoum)[0])
        if sqlTables.get(tableColoum)[3]:
            queryTableList.append(sqlTables.get(tableColoum)[3])
        else:
            queryTableList.append(f"'' AS {tableColoum}")
        queryIsNumberList.append(sqlTables.get(tableColoum)[1])
    
    queryString = ','.join(queryTableList)
    
    def sql_generator():
        import pymssql
        with pymssql.connect(sqlServer, sqlUser, sqlPass, sqlDB) as conn:
            with conn.cursor() as cursor:
                cursor.execute('SELECT %s FROM %s'% (queryString, sqlTableName))
                row = cursor.fetchone()
                while row:
                    row = list(row)
                    contactWithNumber = False
                    
                    # ALLE Spalten durchgehen
                    for index in range(len(row)):
                        # 1. NULL Werte aus SQL sicher in leere Strings umwandeln
                        if row[index] is None:
                            row[index] = ''
                        else:
                            # 2. Strings von versteckten Leerzeichen befreien (z.B. bei CHAR(50) Feldern)
                            row[index] = str(row[index]).strip()
                            # 3. Mehrfache Leerzeichen im String auf eines reduzieren
                            row[index] = RE_SPACES.sub(' ', row[index])
                            
                        # 4. Wenn es eine Telefon-Spalte ist, die Nummer bereinigen
                        if queryIsNumberList[index] and row[index]:
                            row[index] = clean_number(row[index])
                            if row[index]: # Wenn nach der Bereinigung noch eine Nummer übrig ist
                                contactWithNumber = True
                                
                    if contactWithNumber:
                        # WICHTIGER FIX: Falls das SQL-Feld "Nachname" leer ist, aber eine Firma existiert
                        # setzen wir die Firma als Nachnamen, damit der Key (row[1]) nicht leer ist.
                        if not row[1] and row[2]:
                            row[1] = row[2]
                            
                        # Nur yielding, wenn ein Suchbegriff (Nachname/Firma) existiert
                        if row[1]: 
                            yield row
                            
                    row = cursor.fetchone()
                    
    return sql_generator(), csvHeader


def get_carddav(cdavUrl, user=cdavUser, passwd=cdavPass, auth=cdavAuth, verify=cdavVerify):
    import vobject
    csvHeader = [sqlTables.get(col)[0] for col in sqlTables]
    
    def carddav_generator():
        cdav = CardDAVReader(cdavUrl, user=user, passwd=passwd, verify=verify, auth=auth)
        abook = cdav.get_abook()
        hrefs = list(abook.keys())
        nCards = len(hrefs)
        
        print(f'Fetching {nCards} cards in batches...')
        vcard_batch = cdav.get_multiple_vcards(hrefs)
        
        for href, vCards_bytes in vcard_batch.items():
            vCards = vCards_bytes.decode("utf-8")
            for vCard in vobject.readComponents(vCards):
                row = [''] * 11
                if 'n' in vCard.contents:
                    rawName = vCard.contents['n'][0].value
                elif 'org' in vCard.contents:
                    rawName = vCard.contents['org'][0].value
                elif 'fn' in vCard.contents:
                    rawName = vCard.contents['fn'][0].value
                else:
                    continue
                    
                name = RE_SPACES.sub(' ', str(rawName).strip())
                try:
                    row[0], row[1] = name.rsplit(" ", 1)
                except:
                    row[1] = name
                    
                try:
                    numbers = [tel.value for tel in vCard.contents['tel']]
                except:
                    continue
                    
                contactWithNumber = False
                i = 3
                for tel in numbers:
                    if i <= 6:
                        row[i] = clean_number(tel)
                        if row[i]: contactWithNumber = True
                        i += 1
                
                if contactWithNumber:
                    yield row
                    
    return carddav_generator(), csvHeader


def csv_writer(addrIterable, csvHeader):
    csvObject = io.StringIO()
    writer = csv.writer(csvObject, delimiter=';')
    writer.writerow(csvHeader)
    for addr in addrIterable:
        writer.writerow(addr)
    return csvObject

class Starface:
    def __init__(self, sfProto, sfServer, sfUser, sfPass):
        self.url = f'{sfProto}://{sfServer}'
        self.User = sfUser
        self.Pass = sfPass
        self.headers = {'Content-Type':'application/json', 'X-Version':'2'}
        self.session = requests.session()

        response = self.session.get(f'{self.url}/rest/login', headers=self.headers, verify=False)
        templateJson = json.loads(response.content)
        userandnonce=(self.User+templateJson['nonce']).encode(encoding='utf_8', errors='strict')
        hpassword=hashlib.sha512(self.Pass.encode(encoding='utf_8', errors='strict')).hexdigest()
        passwordHashed=hpassword.encode(encoding='utf_8')
        hsecret = hashlib.sha512(userandnonce+passwordHashed).hexdigest().encode(encoding='utf_8')
        secretCompound=self.User+':'+hsecret.decode(encoding='utf_8')
        templateJson['secret'] = secretCompound
        authTokenResponse = self.session.post(f'{self.url}/rest/login', data=json.dumps(templateJson), headers=self.headers)
        self.headers.update({'authToken':json.loads(authTokenResponse.content)['token']})
        
        response = self.session.get(f'{self.url}/rest/contacts/tags', data='', headers=self.headers)
        self.addrBook = json.loads(response.content)

    def add_or_update_contact(self, row, contact_id=None):
        contact={
        "blocks": [
            {  'name': 'contact',
               'attributes': [
                   { 'name': 'firstname', 'value': row[0] },
                   { 'name': 'familyname','value': row[1] },
                   { 'name': 'company',   'value': row[2] }
               ] },
            {  "name": "address",
               "attributes": [
                   { 'name': 'street',   'value': row[10] },
                   { 'name': 'postcode', 'value': row[8]  },
                   { 'name': 'city',     'value': row[9]  }
               ] },
            {  "name": "telephone",
               "attributes": [
                   { 'name': 'phone',    'value': row[3]  },
                   { 'name': 'homephone','value': row[4]  },
                   { 'name': 'mobile',   'value': row[5]  },
                   { 'name': 'fax',      'value': row[6]  }
               ] },
            {  "name": "email",
               "attributes": [
                   { 'name': 'e-mail',   'value': row[7]  }
               ] }
        ],
        "editable": "true",
        "tags": [
            {   'id': self.addrBook[0]['id'],
                'name': self.addrBook[0]['name'],
                'alias': self.addrBook[0]['alias'] }
        ],
        "id": contact_id if contact_id else ""
        }
        
        if contact_id:
            response = self.session.put(f'{self.url}/rest/contacts/{contact_id}', data=json.dumps(contact), headers=self.headers)
            print(f'Status: {response.status_code} updating: {row[1]}')
        else:
            response = self.session.post(f'{self.url}/rest/contacts', data=json.dumps(contact), headers=self.headers)
            print(f'Status: {response.status_code} adding: {row[1]}')
        
    def transfer(self, addrIterable):
        print("Starte Abgleich mit Starface (On-Demand Caching)...")
        
        # Lokaler Cache für alle Kontakte, die wir während des Laufs von Starface abfragen
        sf_cache = {}
        processed_inputs = set()
        
        for row in addrIterable:
            try:
                # Eindeutigen Schlüssel für den Input bilden
                input_key = f"{row[0]}_{row[1]}"
                if input_key in processed_inputs:
                    continue
                processed_inputs.add(input_key)
                
                search_name = row[1] # Der Nachname (oder Firmenname)
                
                # Wenn wir diesen Namen noch nie bei Starface gesucht haben, tun wir es jetzt
                if search_name not in sf_cache:
                    # Gezielte Suche nach dem Namen anstatt des gesamten Adressbuchs
                    url = f'{self.url}/rest/contacts?searchTerms={search_name}'
                    response = self.session.get(url, headers=self.headers)
                    
                    sf_cache[search_name] = [] # Leere Liste als Standard anlegen
                    
                    if response.status_code == 200:
                        search_data = json.loads(response.content)
                        if isinstance(search_data, dict):
                            found_contacts = search_data.get('contacts', [])
                        else:
                            found_contacts = search_data
                            
                        # Gefundene Starface-Kontakte in unseren lokalen Cache für diesen Namen übertragen
                        for c in found_contacts:
                            summary_list = c.get('summaryValues', [])
                            phone_list = c.get('phoneNumberValues', [])
                            
                            if not summary_list or not phone_list:
                                for block in c.get('blocks', []):
                                    if block.get('name') == 'contact':
                                        for attr in block.get('attributes', []):
                                            if attr.get('value'):
                                                summary_list.append(attr['value'])
                                    elif block.get('name') == 'telephone':
                                        for attr in block.get('attributes', []):
                                            if attr.get('value'):
                                                phone_list.append(attr['value'])

                            cleaned_sf_phones = set()
                            for p in phone_list:
                                cleaned_p = clean_number(p)
                                if cleaned_p:
                                    cleaned_sf_phones.add(cleaned_p)

                            sf_cache[search_name].append({
                                'id': c.get('id'),
                                'summary': summary_list,
                                'phones': cleaned_sf_phones
                            })

                # Jetzt haben wir garantiert alle Starface-Einträge zu diesem Namen im sf_cache[search_name]
                contactMissing = True
                new_phones = {row[3], row[4], row[5], row[6]} - {''}

                for c in sf_cache[search_name]:
                    # Prüfe Nachname (immer Pflicht)
                    if row[1] in c['summary']:
                        
                        # Prüfe Vorname NUR, wenn in der Quelle (row[0]) überhaupt einer steht.
                        # Wenn die Quelle keinen Vornamen hat, ignorieren wir, was Starface hat.
                        if row[0] and row[0] not in c['summary']:
                            continue # Das ist jemand anders, suche weiter
                            
                        contactMissing = False
                        matched_id = c['id']
                        
                        # Berechne die Differenz: Welche Nummern aus der Quelle fehlen im Starface-Cache?
                        # Wichtig: Leere Strings '' werden vorher schon entfernt.
                        missing_phones = new_phones - c['phones']
                        
                        if missing_phones:
                            self.add_or_update_contact(row, matched_id)
                            c['phones'].update(new_phones)
                        else:
                            print(f'Nothing changed: {row[1]}')
                        break
                        
                if contactMissing:
                    self.add_or_update_contact(row)
                    sf_cache[search_name].append({
                        'id': 'new',
                        'summary': [row[0], row[1]],
                        'phones': new_phones
                    })
                    
            except Exception as e:
                print(f"Fehler bei {row[1]}: {e}")

# ==========================================
# Skript Ausführung
# ==========================================

if __name__ == "__main__":
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)

    a = parseArgs()

    if a.input == 'sql':
        addrIterable, csvHeader = read_sql(sqlServer, sqlUser, sqlPass, sqlDB)

    elif a.input == 'carddav':
        addrIterable, csvHeader = get_carddav(cdavUrl, user=cdavUser, passwd=cdavPass, auth=cdavAuth, verify=cdavVerify)

    if a.output == 'print':
        csvObject = csv_writer(addrIterable, csvHeader)
        print(csvObject.getvalue())

    elif a.output == 'csv':
        csvObject = csv_writer(addrIterable, csvHeader)
        with open('sql_output.csv', 'w', encoding="utf-8") as file:
            file.write(csvObject.getvalue())
            print('sql_output.csv written')

    elif a.output == 'starface':
        S = Starface(sfProto, sfServer, sfUser, sfPass)
        S.transfer(addrIterable)

 

Das Skript kann auch auf anderen Linux-Umgebungen (z.B. einem Proxmox Hypervisor) installiert werden. Es sind inkompatibilitäten mit Python 3.13 (Proxmox 9) bekannt: Die Rest Schnittstelle funktioniert nur noch, sofern die Starface über ein nicht selbst-signiertes Zertifikat verfügt. Wer eine schnelle Lösung hat, gerne per E-Mail zu mir.