Diese Dokumentation ist als Leitfaden für die Airbyte Connector Entwicklung gedacht.
Als Beispielfall ist hier die Entwicklung des Weclapp Connectors dargelegt.
Understand Airbyte
https://docs.airbyte.com/understanding-airbyte/beginners-guide-to-catalog
https://docs.airbyte.com/understanding-airbyte/airbyte-protocol
Build a Connector with Python CDK
https://docs.airbyte.com/connector-development/
https://docs.airbyte.com/connector-development/cdk-python/
https://docs.airbyte.com/connector-development/tutorials/cdk-tutorial-python-http/getting-started - HTTP API Source Exchange Rates API Example
Weiteres Connector Beispiel - Stripe als Source https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-stripe
Share feedback and requests with us on our Slack channel or email us at feedback@airbyte.io
weclapp Connector
Diese Dokumentation entstand aus und bezieht sich immer wieder auf das Projekt einen Airbyte Connector für weclapp als Source zu entwickeln.
Prerequisites Python CDK (Connector Development Kit)
Python >= 3.9
Docker
NodeJS (wird zurzeit noch benötigt, aber soll bald entfernt werden)
Getting Started
Mithilfe des Code Generators lässt sich ein Template Connector erstellen. Dazu muss zuerst das Airbyte Repository geklont werden.
git clone --depth 1 https://github.com/airbytehq/airbyte/ cd airbyte
Danach von der Repository Root in den Ordner mit dem generate.sh Skript wechseln und es starten:
cd airbyte-integrations/connector-templates/generator ./generate.sh
Darauf muss im Dialog die Art des Connectors (z.B. Python HTTP API Source) ausgewählt werden und der Connector benannt werden.
Nun in den Ordner des erstellten Connectors wechseln.
cd ../../connectors/<erstellterConnector>
Set up des initialen Python Environments (auch in der README des erstellten Connectors zu finden):
python3 -m venv .venv # create venv source .venv/bin/activate # activate venv pip install -r requirements.txt # install requirements
Die requirements.txt nicht anfassen. Sie beinhaltet Airbyte Dependencies.
Eigene Dependencies werden in die setup.py eingetragen.
Start des Development am Connector
In dem Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector> sind TODOs in den einzelnen Dateien zu finden. In der Theorie sollten alle TODOs ausgefüllt werden und danach sollte der Connector funktionieren. Hilfreicher ist es sich an der folgenden Reihenfolge zu orientieren:
Define Inputs in spec.yaml
In der spec.yaml (im Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector>/<erstellterConnector>) müssen die notwendigen Informationen als Parameter hinterlegt und definiert werden, die zum Zugriff auf die API der Datenquelle benötigt werden. Die hier hinterlegten Parameter werden in der Airbyte UI beim Hinzufügen des Connectors erfragt.
In der config.json (im Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector>/<erstellterConnector>/secrets) müssen dann die in der spec.yaml angegeben notwendigen Informationen tatsächlich eingetragen werden.
Für weclapp ist das der API Token, der den Zugriff auf die weclapp API ermöglicht. In der spec.yaml macht es natürlich auch Sinn weitere Parameter zu hinterlegen, wie zum Beispiel die Tenant Url.
spec.yaml für den weclapp Connector:
documentationUrl: https://docsurl.com connectionSpecification: $schema: http://json-schema.org/draft-07/schema# title: Weclapp Spec type: object required: - api_token properties: # This schema defines the configuration required for the source. # This usually involves metadata such as database and/or authentication information: api_token: type: string title: API Token description: API Token to access weclapp API airbyte_secret: true
Mit dem folgenden Command kann man sich die spec ausgeben lassen und prüfen, ob alles bis hierhin funktioniert hat.
python3 main.py spec
Check Connection Function in source.py
Die Funktion check_connection
aus class SourceWeclapp
in source.py prüft die Verbindung zu der API der Datenquelle mithilfe der spec.yaml und config.json.
Beispiel für den weclapp Connector:
def check_connection(self, logger, config) -> Tuple[bool, any]: """ Connection check to validate that the user-provided config can be used to connect to the underlying API :param config: the user-input config object conforming to the connector's spec.yaml :param logger: logger object :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. """ r = requests.get( "{tenant_url}webapp/api/v1/user", headers={ "AuthenticationToken": config["api_token"] }, ) http_response_code = r.status_code response = r.json() print(f"http_response_code: {http_response_code} \n" f"response: {response} \n") if http_response_code == 200: return True, None else: return False, f"response was {response}"
Mit dem folgenden Command ruft man die Funktion check_connection
auf, die die Verbindung zu der API der Datenquelle mithilfe der spec.yaml und config.json prüft.
python3 main.py check --config secrets/config.json
Declare the Streams & their Schema
Als nächstes müssen die Streams und deren Schemas definiert werden. Dafür müssen für jeden Stream unseres Connectors zwei Schritte absolviert werden:
In source.py muss eine Python class für den Stream erstellt werden, die HttpStream erweitert. Hier wird der primary Key, der Endpunkt des Streams und ggf. weitere Infos angegeben.
In den Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector>/<erstellterConnector>/schemas muss eine <stream_name>.json Datei angelegt werden. Diese Datei beschreibt den Output des Streams (zum Beispiel den Output einer API GET Methode). Der Name der Datei muss im snake_case erfolgen.
Für den ersten Schritt geht man in die source.py und erstellt eine class für den Stream.
Beispiel für den Stream user der weclapp API:
class User(WeclappStream): """ stream that corresponds to data source user """ # email (string) primary_key = "email" def path( self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: # url is https://<tenant>/webapp/api/v1/user so return "user" return "user"
Dann fügt man in der Funktion streams
in der class SourceWeclapp
dem return den Stream hinzu.
def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ :param config: A Mapping of the user input configuration as defined in the connector spec. """ return [ User(config["api_token"]) ]
I
m zweiten Schritt hat man mehrere Möglichkeiten https://docs.airbyte.com/connector-development/cdk-python/schemas/. Natürlich kann man alle schemas in .json-Dateien manuell erstellen, aber es gibt auch zwei einfachere Vorgehensweisen. Die erste Vorgehensweise openapi2jsonschema wurde genutzt und wird folgend kurz beschrieben.
Option 1 airbyte/tools/openapi2jsonschema at master · airbytehq/airbyte
Option 2 airbyte/tools/schema_generator at master · airbytehq/airbyte
Im Ordner airbyte/tools/openapi2jsonschema/ kann man mit dem run.sh Skript aus einer swagger.json Datei schemas erstellen lassen. Die erstellen schemas werden im Ordner “schemas” im aktuellen Arbeitsordner abgelegt.
tools/openapi2jsonschema/run.sh <path to OpenAPI definition file>
Da man jetzt die Schemas definiert hat, kann man sich mit dem folgenden Command den AirbyteCatalog ausgeben lassen.
python3 main.py discover --config secrets/config.json
Den AirbyteCatalog speichert man nun wie ausgegeben in der Datei airbyte/airbyte-integrations/connectors/<erstellterConnector>/integration_tests/catalog.json ab. An der catalog.json muss danach nichts händisch verändert werden.
Nun muss der AirbyteCatalog konfiguriert werden, das geschieht in der Datei airbyte/airbyte-integrations/connectors/<erstellterConnector>/integration_tests/configured_catalog.json. Dies geschieht nach dem folgendem Muster:
{ "streams" : [ { "stream" : { // stream a aus catalog.json einfüllen }, // sync mode etc. für stream a konfigurieren, beispielsweise "sync_mode" : "full_refresh", "destination_sync_mode" : "overwrite", "primary_key" : [["id"]] }, { "stream" : { // stream b aus catalog.json einfüllen }, // sync mode etc. für stream b konfigurieren, beispielsweise "sync_mode" : "full_refresh", "destination_sync_mode" : "overwrite", "primary_key" : [["email"]] } ] }
Read Data
Bevor man tatsächlich Daten aus der Quelle lesen kann, müssen noch ein paar Funktionen in der abstract base class WeclappStream
in der source.py ausgefüllt werden.
Die
__init__
Funktion muss ausgefüllt werden (Variablen initialisieren, die als Input Parameter hereinkamen).
def __init__(self, api_token: str, **kwargs): super().__init__() self.api_token = api_token
In der Funktion
request_headers
müssen die benötigten Einträge für den Request-Header zurückgegeben werden.
def request_headers( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: """ This method defines the request header. :return necessary content of the request header """ return {"AuthenticationToken": self.api_token}
Die Funktion
parse_response
verarbeitet die empfangene Antwort und gibt sie zurück. Der Generatoryield
ist zur Rückgabe sehr gut geeignet.
def parse_response( self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> Iterable[Mapping]: """ This method defines how a response is parsed. :return an iterable containing each record in the response """ # response.json returns a dict with a list of dicts, e.g. {'result': [{'id': '3012', 'createdDate': 1669121480169}]} # We only need the list of dicts. The yield generator creates a fitting response_json. response_json = response.json() yield from response_json.get("result", [])
Funktion
next_page_token
kümmert sich um die Pagination (nötig, damit nicht nur die erste Page der Query gelesen wird)
Pagination für die Queries TODO
Mit dem folgenden Command kann man nun eine read-Operation durchführen und testen, ob alles richtig konfiguriert ist und funktioniert.
python3 main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
Locally Running the Connector Docker Image
Nun kann man aus dem Code des Connectors ein Docker Image erstellen lassen. Die für das Projekt passenden Commands sind auch im README des jeweiligen Projekts zu finden. Nach dem Bauen des Images kann man die zu oben korrespondierenden Befehle (spec, check, discover und read) auch mit dem Docker Image ausführen lassen.
# build docker image docker build . -t airbyte/source-weclapp:dev # commands docker run --rm airbyte/source-weclapp:dev spec docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-weclapp:dev check --config /secrets/config.json docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-weclapp:dev discover --config /secrets/config.json docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-weclapp:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
Push Connector Image to DockerHub
Damit wir unseren erstellten Connector in Airbyte hinzufügen können, müssen wir das Docker Image in einem Repository auf DockerHub zur Verfügung stellen, https://docs.docker.com/docker-hub/repos/.
Als Tag lieber eine Versionsnummer wie 0.2.0 verwenden anstelle von latest, da somit das Updaten des Connectors in der Airbyte UI einfacher wird.
# create image for repository from existing dev image docker tag <existing-dev-image> <hub-user>/<repo-name>[:<tag>] # push image to DockerHub repository docker push <image> # z.B. aettingshausen/airbyte-connector-source-weclapp:0.2.0
Build and Push Docker Image for amd & arm → the way to go !
Auf dem MacBook mit M1 Chip werden Docker Images für arm erstellt. Wenn man die VM auf einer amd-Architektur hochzieht, bekommt man beim Hinzufügen der neuen Source die Fehlermeldung “Internal Server Error: Get Spec job failed“. Man benötigt ein Docker Image für die passende Architektur. Mit den Experimental Features von Docker lassen sich Docker Images für mehrere Architekturen erstellen. Also am besten sowohl für arm als auch amd ein Image erstellen lassen → Anleitung How to build x86 (and others!) Docker images on an M1 Mac:
TLDR; Version
Open the Docker Desktop dashboard then open up Preferences (cog icon). Go to “Experimental Features” then turn it on and apply it.
Next create a new builder instance with
docker buildx create --use
. This lets you specify multiple docker platforms at once.To build your Dockerfile for typical x86 systems and Apple Silicon Macs, run
docker buildx build --platform linux/amd64,linux/arm64 --push -t <tag_to_push> .
Done. Please note that you have to push directly to a repository if you want Docker Desktop to automatically manage the manifest list for you (this is probably something you want). Read the paragraph below to find out why.
Once the build is finished, it will be automatically uploaded to your configured registry. Docker will also automatically manage the manifest list for you. This allows Docker to combine the separate builds for each architecture into a single “manifest”. This means users can do a normal docker pull <image>
and the Docker client will automatically work out the correct image for their CPU architecture – pretty neat!
Adding a New Connector
(Step 11: Add the connector to the API/UI | Airbyte Documentation)
Neue oder custom Connectoren können einfach über die Airbyte UI hinzugefügt werden, siehe dazu https://docs.airbyte.com/integrations/custom-connectors.
Testing
Die Standard-Tests haben erst einmal ausgereicht → next step genauer reinschauen
Deploy Airbyte Open Source
Deploy Airbyte | Airbyte Documentation
git clone https://github.com/airbytehq/airbyte.git cd airbyte docker-compose up
Once you see an Airbyte banner, the UI is ready to go at http://localhost:8000! You will be asked for a username and password. By default, that's username “airbyte” and password “password”. Once you deploy airbyte to your servers, be sure to change these in your .env file.
Build a Connector with Low-Code Approach
Der Low-Code Ansatz bietet zum einen ein Low-Code Connector Development Kit (CDK) und ein UI Builder. Beide befinden sich noch in der Alpha, d.h. sie befinden sich im aktiven Development und es können “backward-incompatible changes” auftreten.
Aus diesem Grund und dem Gefühl mit der Python CDK Airbyte und das Connector Development besser lernen und verstehen zu können, wurde die Python CDK für den ersten Versuch gewählt.
Airbyte Tutorialvideo Building with Airbyte got a whole lot FASTER!! Low Code CDK Tutorial (Part 1)
weclapp
weclapp Testinstanz
Unter https://www.weclapp.com/de/registrieren/ kann man eine kostenlose 30-tägige Testversion von weclapp starten.
weclapp REST API
Mit der weclapp REST API kann man weclapp in andere Systeme integrieren und nutzen.
Die Dokumentation der API ist unter https://www.weclapp.com/api/ oder im weclapp Tenant unter “https://<tenant>.weclapp.com/webapp/view/api/” einzusehen. Im Tenant selbst sieht man nur die API Methoden, die auch für die eigenen Nutzerrechte freigeschaltet sind.
Unter https://www.weclapp.com/api/ oder “https://<tenant>.weclapp.com/webapp/view/api/” kann man auch die swagger.json herunterladen. Eine swagger.json Datei beschreibt alle verfügbaren Ressourcen und Operationen der REST API.
Die Base URL der API ist https://<tenant>.weclapp.com/webapp/api/v1/
Es ist eine JSON-only API. Der Header aller Requests muss ein "Accept: application/json" enthalten und alle Header von PUT und POST Operationen benötigen ein “Content-Type: application/json”.
API Requests können nur von verifizierten Usern erhoben werden. User können sich mithilfe eines API Token gegenüber der API autorisieren. Diesen API Token kann man sich in seinem weclapp Account unter My Settings > API Token erstellen lassen. Der API Token beinhaltet nur Zugriff auf die API Methoden, die auch für die eigenen Nutzerrechte freigeschaltet sind.
Die Authentifizierung ist in 2 Wegen möglich:Der API Token kann mit dem AuthenticationToken Header “AuthenticationToken: {api_token}” gesendet werden.
Die Standard HTTP Basic Authentifizierung mit username = “*” und password = {api_token} kann verwendet werden.
curl Command zum Testen der Verbindung zur weclapp API im Terminal:
curl -X GET -H Accept:application/json -H AuthenticationToken:<api_token> https://<tenant>.weclapp.com/webapp/api/v1/user
Fragen an weclapp Support bzgl. API und KPIs:
Welche Rolle benötige ich in weclapp, damit ich mit meinem API Token auf alle API Methoden zugreifen kann?
Über welche API Methode(n) + JSON-Key kann ich den Gesamtumsatz auslesen? Hier wäre es auch wichtig, den Umsatz für verschiedene Zeitpunkte bestimmen zu können.
Über welche API Methode(n) + JSON-Key kann ich die Anzahl und Summe der Ausgangsrechnungen bestimmen? Hier wären auch wieder verschiedene Zeitpunkte gut zu haben.
Antwort des Supports:
Was Sie über die API bearbeiten können ist abhängig von den Berechtigungen in weclapp. Wenn Sie in weclapp bspw. die Berechtigung haben, eine Einkaufsrechnung bearbeiten zu können, dann können Sie das über die API auch tun. Damit alle Methoden angewandt werden können, empfiehlt es sich auch alle Berechtigungen zu aktivieren (also jeweils Administrator-Rechte im Bereich Warenwirtschaft, Buchhaltung, Projektmanagement, Organizer, CRM, Helpdesk und Vertragswesen).
Rechnungen können grundsätzlich mit /salesInvoice abgerufen werden. Filtern nach dem Datum ist per ?invoiceDate-gt=1616672697237&invoiceDate-lt=1666875377295 möglich (gt steht für greater than und lt für less than). Zur Angabe der Zeitpunkte müssten Sie hier schauen: https://www.unixtimestamp.com/
Zusätzlich können Sie über paymentStatus-in=[PAID, OPEN] nach dem Status der Rechnung filtern (Paid und Open habe ich hier als Beispiel von möglichen Werten genommen).
Nach dem Betrag können Sie filtern, wenn Sie die entsprechende property mitgeben, Sie müssten das über die genutzte Schnittstelle anschließend nur noch addieren lassen.
Der komplette Request kann dann bspw. so aussehen:
/salesInvoice?invoiceDate-gt=1616672697237&invoiceDate-lt=1666875377295&properties=netAmount&paymentStatus-in=[PAID, OPEN]
Die hier genutzten und alle weiteren möglichen Methoden finden Sie auch hier: https://doc.weclapp.com/knowledgebase/gibt-es-eine-api-zu-weclapp/
Link zu dieser Seite: https://seibert.biz/7j1