Diese Dokumentation ist als Leitfaden für die Airbyte Connector Entwicklung gedacht.
Als Beispielfall ist hier die Entwicklung des Weclapp Connectors dargelegt.

Understand Airbyte

https://docs.airbyte.com/

https://docs.airbyte.com/understanding-airbyte/beginners-guide-to-catalog

https://docs.airbyte.com/understanding-airbyte/airbyte-protocol


Build a Connector with Python CDK

https://docs.airbyte.com/connector-development/

https://docs.airbyte.com/connector-development/cdk-python/

https://docs.airbyte.com/connector-development/tutorials/cdk-tutorial-python-http/getting-started - HTTP API Source Exchange Rates API Example

Weiteres Connector Beispiel - Stripe als Source https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-stripe

Share feedback and requests with us on our Slack channel or email us at feedback@airbyte.io


weclapp Connector

Diese Dokumentation entstand aus und bezieht sich immer wieder auf das Projekt einen Airbyte Connector für weclapp als Source zu entwickeln.


Prerequisites Python CDK (Connector Development Kit)

  • Python >= 3.9

  • Docker

  • NodeJS (wird zurzeit noch benötigt, aber soll bald entfernt werden)


Getting Started

Mithilfe des Code Generators lässt sich ein Template Connector erstellen. Dazu muss zuerst das Airbyte Repository geklont werden.

git clone --depth 1 https://github.com/airbytehq/airbyte/ 
cd airbyte
 

Danach von der Repository Root in den Ordner mit dem generate.sh Skript wechseln und es starten:

cd airbyte-integrations/connector-templates/generator
./generate.sh

Darauf muss im Dialog die Art des Connectors (z.B. Python HTTP API Source) ausgewählt werden und der Connector benannt werden.

Nun in den Ordner des erstellten Connectors wechseln.

cd ../../connectors/<erstellterConnector>

Set up des initialen Python Environments (auch in der README des erstellten Connectors zu finden):

python3 -m venv .venv  # create venv
source .venv/bin/activate  # activate venv
pip install -r requirements.txt  # install requirements

Die requirements.txt nicht anfassen. Sie beinhaltet Airbyte Dependencies.
Eigene Dependencies werden in die setup.py eingetragen.

Start des Development am Connector

In dem Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector> sind TODOs in den einzelnen Dateien zu finden. In der Theorie sollten alle TODOs ausgefüllt werden und danach sollte der Connector funktionieren. Hilfreicher ist es sich an der folgenden Reihenfolge zu orientieren:


Define Inputs in spec.yaml

In der spec.yaml (im Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector>/<erstellterConnector>) müssen die notwendigen Informationen als Parameter hinterlegt und definiert werden, die zum Zugriff auf die API der Datenquelle benötigt werden. Die hier hinterlegten Parameter werden in der Airbyte UI beim Hinzufügen des Connectors erfragt.

In der config.json (im Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector>/<erstellterConnector>/secrets) müssen dann die in der spec.yaml angegeben notwendigen Informationen tatsächlich eingetragen werden.

Für weclapp ist das der API Token, der den Zugriff auf die weclapp API ermöglicht. In der spec.yaml macht es natürlich auch Sinn weitere Parameter zu hinterlegen, wie zum Beispiel die Tenant Url.

spec.yaml für den weclapp Connector:

documentationUrl: https://docsurl.com
connectionSpecification:
  $schema: http://json-schema.org/draft-07/schema#
  title: Weclapp Spec
  type: object
  required:
    - api_token
  properties:
    # This schema defines the configuration required for the source.
    # This usually involves metadata such as database and/or authentication information:
    api_token:
      type: string
      title: API Token
      description: API Token to access weclapp API
      airbyte_secret: true

Mit dem folgenden Command kann man sich die spec ausgeben lassen und prüfen, ob alles bis hierhin funktioniert hat.

python3 main.py spec

Check Connection Function in source.py

Die Funktion check_connection aus class SourceWeclapp in source.py prüft die Verbindung zu der API der Datenquelle mithilfe der spec.yaml und config.json.

Beispiel für den weclapp Connector:

def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        Connection check to validate that the user-provided config can be used to connect to the underlying API

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        r = requests.get(
            "{tenant_url}webapp/api/v1/user",
            headers={
                "AuthenticationToken": config["api_token"]
            },
        )

        http_response_code = r.status_code
        response = r.json()
        print(f"http_response_code: {http_response_code} \n"
              f"response: {response} \n")

        if http_response_code == 200:
            return True, None
        else:
            return False, f"response was {response}"

Mit dem folgenden Command ruft man die Funktion check_connection auf, die die Verbindung zu der API der Datenquelle mithilfe der spec.yaml und config.json prüft.

python3 main.py check --config secrets/config.json

Declare the Streams & their Schema

Als nächstes müssen die Streams und deren Schemas definiert werden. Dafür müssen für jeden Stream unseres Connectors zwei Schritte absolviert werden:

  • In source.py muss eine Python class für den Stream erstellt werden, die HttpStream erweitert. Hier wird der primary Key, der Endpunkt des Streams und ggf. weitere Infos angegeben.

  • In den Ordner airbyte/airbyte-integrations/connectors/<erstellterConnector>/<erstellterConnector>/schemas muss eine <stream_name>.json Datei angelegt werden. Diese Datei beschreibt den Output des Streams (zum Beispiel den Output einer API GET Methode). Der Name der Datei muss im snake_case erfolgen.

Für den ersten Schritt geht man in die source.py und erstellt eine class für den Stream.

Beispiel für den Stream user der weclapp API:

class User(WeclappStream):
    """
    stream that corresponds to data source user
    """

    # email (string)
    primary_key = "email"

    def path(
        self,
        stream_state: Mapping[str, Any] = None,
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None
    ) -> str:

        # url is https://<tenant>/webapp/api/v1/user so return "user"
        return "user"

Dann fügt man in der Funktion streams in der class SourceWeclapp dem return den Stream hinzu.

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        
        return [
            User(config["api_token"])
        ]

I

m zweiten Schritt hat man mehrere Möglichkeiten https://docs.airbyte.com/connector-development/cdk-python/schemas/. Natürlich kann man alle schemas in .json-Dateien manuell erstellen, aber es gibt auch zwei einfachere Vorgehensweisen. Die erste Vorgehensweise openapi2jsonschema wurde genutzt und wird folgend kurz beschrieben.

Option 1 airbyte/tools/openapi2jsonschema at master · airbytehq/airbyte

Option 2 airbyte/tools/schema_generator at master · airbytehq/airbyte

Im Ordner airbyte/tools/openapi2jsonschema/ kann man mit dem run.sh Skript aus einer swagger.json Datei schemas erstellen lassen. Die erstellen schemas werden im Ordner “schemas” im aktuellen Arbeitsordner abgelegt.

tools/openapi2jsonschema/run.sh <path to OpenAPI definition file>

Da man jetzt die Schemas definiert hat, kann man sich mit dem folgenden Command den AirbyteCatalog ausgeben lassen.

python3 main.py discover --config secrets/config.json

Den AirbyteCatalog speichert man nun wie ausgegeben in der Datei airbyte/airbyte-integrations/connectors/<erstellterConnector>/integration_tests/catalog.json ab. An der catalog.json muss danach nichts händisch verändert werden.

Nun muss der AirbyteCatalog konfiguriert werden, das geschieht in der Datei airbyte/airbyte-integrations/connectors/<erstellterConnector>/integration_tests/configured_catalog.json. Dies geschieht nach dem folgendem Muster:

{
  "streams" : [
    {
      "stream" : {
        // stream a aus catalog.json einfüllen
      },
      // sync mode etc. für stream a konfigurieren, beispielsweise
      "sync_mode" : "full_refresh",
      "destination_sync_mode" : "overwrite",
      "primary_key" : [["id"]]
    },
    {
      "stream" : {
        // stream b aus catalog.json einfüllen
      },
      // sync mode etc. für stream b konfigurieren, beispielsweise
      "sync_mode" : "full_refresh",
      "destination_sync_mode" : "overwrite",
      "primary_key" : [["email"]]
    }
  ]
}

Read Data

Bevor man tatsächlich Daten aus der Quelle lesen kann, müssen noch ein paar Funktionen in der abstract base class WeclappStream in der source.py ausgefüllt werden.

  • Die __init__ Funktion muss ausgefüllt werden (Variablen initialisieren, die als Input Parameter hereinkamen).

def __init__(self, api_token: str, **kwargs):
        super().__init__()
        self.api_token = api_token
  • In der Funktion request_headers müssen die benötigten Einträge für den Request-Header zurückgegeben werden.

def request_headers(
        self,
        stream_state: Mapping[str, Any],
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None
    ) -> Mapping[str, Any]:
        """
        This method defines the request header.
        :return necessary content of the request header
        """
        return {"AuthenticationToken": self.api_token}
  • Die Funktion parse_response verarbeitet die empfangene Antwort und gibt sie zurück. Der Generator yield ist zur Rückgabe sehr gut geeignet.

def parse_response(
        self,
        response: requests.Response,
        stream_state: Mapping[str, Any],
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None,
    ) -> Iterable[Mapping]:
        """
        This method defines how a response is parsed.
        :return an iterable containing each record in the response
        """
        # response.json returns a dict with a list of dicts, e.g. {'result': [{'id': '3012', 'createdDate': 1669121480169}]}
        # We only need the list of dicts. The yield generator creates a fitting response_json.
        response_json = response.json()
        yield from response_json.get("result", [])
  • Funktion next_page_token kümmert sich um die Pagination (nötig, damit nicht nur die erste Page der Query gelesen wird)

Pagination für die Queries TODO

Mit dem folgenden Command kann man nun eine read-Operation durchführen und testen, ob alles richtig konfiguriert ist und funktioniert.

python3 main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json

Locally Running the Connector Docker Image

Nun kann man aus dem Code des Connectors ein Docker Image erstellen lassen. Die für das Projekt passenden Commands sind auch im README des jeweiligen Projekts zu finden. Nach dem Bauen des Images kann man die zu oben korrespondierenden Befehle (spec, check, discover und read) auch mit dem Docker Image ausführen lassen.

# build docker image
docker build . -t airbyte/source-weclapp:dev

# commands
docker run --rm airbyte/source-weclapp:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-weclapp:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-weclapp:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-weclapp:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json


Push Connector Image to DockerHub

Damit wir unseren erstellten Connector in Airbyte hinzufügen können, müssen wir das Docker Image in einem Repository auf DockerHub zur Verfügung stellen, https://docs.docker.com/docker-hub/repos/.
Als Tag lieber eine Versionsnummer wie 0.2.0 verwenden anstelle von latest, da somit das Updaten des Connectors in der Airbyte UI einfacher wird.

# create image for repository from existing dev image 
docker tag <existing-dev-image> <hub-user>/<repo-name>[:<tag>]

# push image to DockerHub repository
docker push <image>  # z.B. aettingshausen/airbyte-connector-source-weclapp:0.2.0

Build and Push Docker Image for amd & arm → the way to go !

Auf dem MacBook mit M1 Chip werden Docker Images für arm erstellt. Wenn man die VM auf einer amd-Architektur hochzieht, bekommt man beim Hinzufügen der neuen Source die Fehlermeldung “Internal Server Error: Get Spec job failed“. Man benötigt ein Docker Image für die passende Architektur. Mit den Experimental Features von Docker lassen sich Docker Images für mehrere Architekturen erstellen. Also am besten sowohl für arm als auch amd ein Image erstellen lassen → Anleitung How to build x86 (and others!) Docker images on an M1 Mac:

TLDR; Version

  1. Open the Docker Desktop dashboard then open up Preferences (cog icon). Go to “Experimental Features” then turn it on and apply it.

  2. Next create a new builder instance with docker buildx create --use. This lets you specify multiple docker platforms at once.

  3. To build your Dockerfile for typical x86 systems and Apple Silicon Macs, run docker buildx build --platform linux/amd64,linux/arm64 --push -t <tag_to_push> .

  4. Done. Please note that you have to push directly to a repository if you want Docker Desktop to automatically manage the manifest list for you (this is probably something you want). Read the paragraph below to find out why.

Once the build is finished, it will be automatically uploaded to your configured registry. Docker will also automatically manage the manifest list for you. This allows Docker to combine the separate builds for each architecture into a single “manifest”. This means users can do a normal docker pull <image> and the Docker client will automatically work out the correct image for their CPU architecture – pretty neat!


Adding a New Connector

(Step 11: Add the connector to the API/UI | Airbyte Documentation)

Neue oder custom Connectoren können einfach über die Airbyte UI hinzugefügt werden, siehe dazu https://docs.airbyte.com/integrations/custom-connectors.


Testing

Die Standard-Tests haben erst einmal ausgereicht → next step genauer reinschauen

Deploy Airbyte Open Source 

Deploy Airbyte | Airbyte Documentation

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up

Once you see an Airbyte banner, the UI is ready to go at http://localhost:8000! You will be asked for a username and password. By default, that's username “airbyte” and password “password”. Once you deploy airbyte to your servers, be sure to change these in your .env file.

Build a Connector with Low-Code Approach

Der Low-Code Ansatz bietet zum einen ein Low-Code Connector Development Kit (CDK) und ein UI Builder. Beide befinden sich noch in der Alpha, d.h. sie befinden sich im aktiven Development und es können “backward-incompatible changes” auftreten.

Aus diesem Grund und dem Gefühl mit der Python CDK Airbyte und das Connector Development besser lernen und verstehen zu können, wurde die Python CDK für den ersten Versuch gewählt.

Airbyte Tutorialvideo Building with Airbyte got a whole lot FASTER!! Low Code CDK Tutorial (Part 1)



weclapp

weclapp Testinstanz

Unter https://www.weclapp.com/de/registrieren/ kann man eine kostenlose 30-tägige Testversion von weclapp starten. 

weclapp REST API

  • Mit der weclapp REST API kann man weclapp in andere Systeme integrieren und nutzen.

  • Die Dokumentation der API ist unter https://www.weclapp.com/api/ oder im weclapp Tenant unter “https://<tenant>.weclapp.com/webapp/view/api/” einzusehen. Im Tenant selbst sieht man nur die API Methoden, die auch für die eigenen Nutzerrechte freigeschaltet sind.

  • Unter https://www.weclapp.com/api/ oder “https://<tenant>.weclapp.com/webapp/view/api/” kann man auch die swagger.json herunterladen. Eine swagger.json Datei beschreibt alle verfügbaren Ressourcen und Operationen der REST API.

  • Die Base URL der API ist https://<tenant>.weclapp.com/webapp/api/v1/

  • Es ist eine JSON-only API. Der Header aller Requests muss ein "Accept: application/json" enthalten und alle Header von PUT und POST Operationen benötigen ein “Content-Type: application/json”.

  • API Requests können nur von verifizierten Usern erhoben werden. User können sich mithilfe eines API Token gegenüber der API autorisieren. Diesen API Token kann man sich in seinem weclapp Account unter My Settings > API Token erstellen lassen. Der API Token beinhaltet nur Zugriff auf die API Methoden, die auch für die eigenen Nutzerrechte freigeschaltet sind.
    Die Authentifizierung ist in 2 Wegen möglich:

    • Der API Token kann mit dem AuthenticationToken Header “AuthenticationToken: {api_token}” gesendet werden.

    • Die Standard HTTP Basic Authentifizierung mit username = “*” und password = {api_token} kann verwendet werden.

curl Command zum Testen der Verbindung zur weclapp API im Terminal:

curl -X GET -H Accept:application/json -H AuthenticationToken:<api_token> https://<tenant>.weclapp.com/webapp/api/v1/user

Fragen an weclapp Support bzgl. API und KPIs:

  • Welche Rolle benötige ich in weclapp, damit ich mit meinem API Token auf alle API Methoden zugreifen kann?

  • Über welche API Methode(n) + JSON-Key kann ich den Gesamtumsatz auslesen? Hier wäre es auch wichtig, den Umsatz für verschiedene Zeitpunkte bestimmen zu können.

  • Über welche API Methode(n) + JSON-Key kann ich die Anzahl und Summe der Ausgangsrechnungen bestimmen? Hier wären auch wieder verschiedene Zeitpunkte gut zu haben.

Antwort des Supports:

Was Sie über die API bearbeiten können ist abhängig von den Berechtigungen in weclapp. Wenn Sie in weclapp bspw. die Berechtigung haben, eine Einkaufsrechnung bearbeiten zu können, dann können Sie das über die API auch tun. Damit alle Methoden angewandt werden können, empfiehlt es sich auch alle Berechtigungen zu aktivieren (also jeweils Administrator-Rechte im Bereich Warenwirtschaft, Buchhaltung, Projektmanagement, Organizer, CRM, Helpdesk und Vertragswesen).

Rechnungen können grundsätzlich mit /salesInvoice abgerufen werden. Filtern nach dem Datum ist per ?invoiceDate-gt=1616672697237&invoiceDate-lt=1666875377295 möglich (gt steht für greater than und lt für less than). Zur Angabe der Zeitpunkte müssten Sie hier schauen: https://www.unixtimestamp.com/

Zusätzlich können Sie über paymentStatus-in=[PAID, OPEN] nach dem Status der Rechnung filtern (Paid und Open habe ich hier als Beispiel von möglichen Werten genommen).

Nach dem Betrag können Sie filtern, wenn Sie die entsprechende property mitgeben, Sie müssten das über die genutzte Schnittstelle anschließend nur noch addieren lassen.

Der komplette Request kann dann bspw. so aussehen:

/salesInvoice?invoiceDate-gt=1616672697237&invoiceDate-lt=1666875377295&properties=netAmount&paymentStatus-in=[PAID, OPEN]

Die hier genutzten und alle weiteren möglichen Methoden finden Sie auch hier: https://doc.weclapp.com/knowledgebase/gibt-es-eine-api-zu-weclapp/



Link zu dieser Seite: https://seibert.biz/7j1

  • No labels
Diese Seite wurde zuletzt am 26.09.2024 geändert.