Skip to content

Module 4 of 15 · 📖 10 min read · ⏱ 30 min total

FI-DPA 04 ETL-ELT-Strecken

Table of contents (7 sections)
  1. Konzepte und Hintergrund
  2. Architektur-Diagramm
  3. Praktische Schritte
  4. Häufige Fallstricke
  5. Weiterführende Ressourcen
  6. Worked Example: Idempotente ELT-Pipeline für Umsatzdaten mit dbt und Airflow
  7. Wissens-Check

FI-DPA 04 ETL/ELT-Strecken

ETL/ELT-Prozesse bilden das Rückgrat moderner Datenintegration. In diesem Modul erlernen Sie die Konzepte zur Extraktion, Transformation und Laden von Daten zwischen verschiedenen Systemen. Sie verstehen, wie Staging-Areas zur Qualitätsicherung eingesetzt werden und warum Idempotenz in der Datenverarbeitung entscheidend ist. Praktische Beispiele mit Airflow, dbt und Talend runden Ihr Wissen ab.

Sie werden in der Lage sein, ETL/ELT-Pipelines zu entwerfen, zu implementieren und zu warten. Der Fokus liegt auf robusten, wartbaren und performanten Lösungen für den Einsatz in KMU-Umgebungen.

Konzepte und Hintergrund

ETL (Extract, Transform, Load)
Der klassische Ansatz, bei dem Daten zuerst extrahiert, dann in einer separaten Datenbank transformiert und schließlich in das Zielsystem geladen werden. Die Transformation erfolgt vor dem Laden in das Zielsystem.
ELT (Extract, Load, Transform)
Ein moderner Ansatz, bei dem Daten direkt in das Zielsystem (meist ein Data Warehouse) geladen und dort transformiert werden. Dies nutzt die Skalierbarkeit moderner Data-Warehouses und ermöglicht flexiblere Transformationen.
Staging-Area
Ein temporärer Speicherbereich, der als Zwischenstation für Daten während des ETL-Prozesses dient. Hier werden Daten bereinigt, validiert und aufbereitet, bevor sie in das Zielsystem geladen werden.
Idempotenz
Eine Eigenschaft von Operationen, die mehrmals ausgeführt werden können, ohne das Ergebnis zu verändern. Im ETL-Kontext bedeutet dies, dass die Ausführung einer Pipeline mehrmals zum identischen Ergebnis führt, auch wenn Datenquellen bereits verarbeitet wurden.
Orchestrierung
Die Koordination verschiedener Tasks und Workflows in einem ETL-Prozess. Tools wie Airflow ermöglichen die Planung, Ausführung und Überwachung komplexer Datenpipelines.

Architektur-Diagramm

flowchart LR
    A[Datenquellen] --> B(Extraktion)
    B --> C[Staging-Area]
    C --> D[Transformation]
    D --> E[Zielsystem]
    F[Airflow] --> B
    F --> D
    G[dbt] --> D
    H[Talend] --> B
    H --> D

Praktische Schritte

  1. Installieren Sie Apache Airflow mit pip in einer virtuellen Umgebung. Dies stellt eine saubere Trennung von Projekten sicher.
  2. python -m venv airflow-env
    source airflow-env/bin/activate
    pip install apache-airflow
  3. Konfigurieren Sie die Airflow-Datenbank mit PostgreSQL für verbesserte Performance und Skalierbarkeit.
  4. export AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:password@localhost/airflow
    airflow db init
  5. Erstellen Sie in dbt ein neues Projekt und konfigurieren Sie die Verbindungsparameter zu Ihrer Staging-Datenbank.
  6. dbt init my_project
    cd my_project
    vim profiles.yml
  7. Definieren Sie in dbt Modelle für Ihre Transformationen mit SQL-Dateien im Ordner models/.
  8. mkdir -p models/staging
    cat > models/staging/customers.sql <<EOF
    select id, name, email from raw.customers
    where email is not null
    EOF</code>
  9. Erstellen Sie in Airflow einen DAG, der dbt aufruft, nachdem die Daten extrahiert wurden.
  10. from airflow.operators.bash import BashOperator
    from airflow import DAG
    from datetime import datetime
    
    with DAG('etl_pipeline', start_date=datetime(2023,1,1)) as dag:
        run_dbt = BashOperator(
            task_id='run_dbt',
            bash_command='dbt run'
        )
  11. Implementieren Sie in Talend eine Job-Komponente zur Datenbereinigung, die doppelte Einträge basierend auf einer eindeutigen ID entfernt.
  12. tRowGenerator --component tUniqueRow --component tFilterRow --component tLogRow
  13. Konfigurieren Sie in Airflow Retry-Policies für kritische Tasks, um vorübergehende Fehler zu handhaben.
  14. run_dbt = BashOperator(
        task_id='run_dbt',
        bash_command='dbt run',
        retries=3,
        retry_delay=timedelta(minutes=5)
    )
  15. Implementieren Sie in dbt Tests zur Sicherstellung der Datenqualität, z.B. Eindeutigkeit von IDs.
  16. cat > models/staging/schema.yml <<EOF
    version: 2
    
    models:
      - name: customers
        columns:
          - name: id
            tests:
              - unique
    EOF</code>
  17. Verwenden Sie in Airflow den Sensor-Operator, um auf das Erscheinen neuer Dateien in einem Verzeichnis zu warten.
  18. from airflow.sensors.filesystem import FileSensor
    
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/data/incoming/*.csv',
        poke_interval=60,
        timeout=3600
    )
  19. Konfigurieren Sie in Talend eine Verbindung zu Ihrer Datenbank mit Verbindungspooling für verbesserte Performance.
  20. tDBConnection --component tDBOutput --component tDBCommit

Häufige Fallstricke

Weiterführende Ressourcen

Worked Example: Idempotente ELT-Pipeline für Umsatzdaten mit dbt und Airflow

Situation: Die Firma LogistikSchnell GmbH betreibt ein Cloud-Data-Warehouse (Snowflake). Täglich werden neue orders.csv-Dateien aus dem ERP-System in einen Raw-Bucket geladen. Das Problem: Bei Netzwerkfehlern bricht die ETL-Pipeline ab. Ein manuelles Neustarten führt dazu, dass dieselben Datensätze doppelt geladen werden, was die monatlichen Umsatzberichte verfälscht. Sie müssen eine idempotente ELT-Strecke implementieren, die bei Wiederholung stets zum gleichen, korrekten Zustand führt.

Wir entscheiden uns für den ELT-Ansatz (Extract, Load, Transform), da Snowflake die Transformationslast effizient übernimmt. Als Werkzeug nutzen wir dbt für die SQL-basierte Transformation und Airflow zur Orchestrierung. Der Schlüssel zur Idempotenz liegt in der Verwendung von MERGE-Operationen statt einfacher INSERT-Befehle.

Durchführung

Wir definieren die Schritte zur Implementierung der idempotenten Transformation.

  1. Extraktion und Loading (Raw Layer): Die CSV-Datei wird unverändert in die Tabelle raw_orders geladen. Wir nutzen einen COPY INTO-Befehl, der Append-only ist. Hier ist Duplizierung noch erlaubt, da wir später bereinigen.
  2. Transformation (dbt Model): Wir schreiben ein dbt-Model stg_orders.sql, das die Rohdaten bereinigt. Entscheidend ist die Nutzung von dbt.ref() und logischer Filterung.
  3. Idempotentes Laden (Marts Layer): Das finale Model fct_sales.sql muss sicherstellen, dass alte Daten überschrieben oder gemergt werden, anstatt neue Zeilen anzuhängen.

Sehen wir uns den konkreten dbt-Code für das finale Model an. Wir verwenden Snowflakes MERGE-Syntax, um auf Änderungen zu reagieren:

-- models/marts/fct_sales.sql
{{ config(
    materialized='incremental',
    unique_key='order_id'
) }}

-- Wir lesen aus der staging-Tabelle
WITH source_data AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

-- Nur neue oder geänderte Datensätze auswählen (Delta)
new_records AS (
    SELECT * FROM source_data
    WHERE order_date >= (SELECT COALESCE(MAX(order_date), '1900-01-01') FROM {{ this }})
)

-- Idempotenter Merge: Aktualisiert existierende, fügt neue hinzu
MERGE INTO {{ this }} AS target
USING new_records AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
    target.amount = source.amount,
    target.status = source.status,
    target.updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT
    (order_id, amount, status, updated_at)
    VALUES
    (source.order_id, source.amount, source.status, CURRENT_TIMESTAMP());

Um dies in Airflow zu orchestrieren, definieren wir die Abhängigkeiten. Wir stellen sicher, dass die Transformation erst beginnt, wenn das Loading abgeschlossen ist.

# dags/elt_logistics.py
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudExecutionTask
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudTrigger

# ... (DAG Definition und Python-Operator für das Loading)

# Schritt 1: Loading nach Raw
load_raw = PythonOperator(
    task_id='load_raw_orders',
    python_callable=load_csv_to_snowflake,
)

# Schritt 2: dbt Run (Transformation)
# dbt Cloud Trigger erlaubt asynchrones Warten auf den Job
transform_dbt = DbtCloudTriggerOperator(
    task_id='run_dbt_incremental',
    dbt_cloud_conn_id='dbt_cloud_conn',
    trigger_id=12345, # ID des dbt Cloud Jobs
    wait=True,
)

load_raw >> transform_dbt

Ergebnis

Das Ergebnis ist eine Tabelle fct_sales, die stets den aktuellen Stand der Wahrheit widerspiegelt. Selbst wenn die Pipeline heute, morgen und übermorgen läuft und Daten sich überschneiden, enthält jede order_id genau einen Satz von Fakten. Die Berichtstabelle zeigt für den 15.03.2024 exakt den Betrag von 1.500 €, egal ob der Job einmal oder zehnmal ausgeführt wurde.

Verifikation der Idempotenz
Szenario Aktion Ergebnis in fct_sales Status
Erste Ausführung INSERT neuer Datensatz (ID: 101) 1 Zeile mit ID 101 Erfolg
Neustart (Duplikat) Upsert (Update) für ID 101 1 Zeile mit ID 101 (aktualisiertes Timestamp) Keine Duplikate
Neuer Tag (neue Daten) INSERT neuer Datensatz (ID: 102) 2 Zeilen (ID 101, 102) Konsistent

Reflexion

Mit diesem Ansatz haben Sie verstanden, dass Idempotenz im ELT-Kontext nicht durch das Verhindern von Wiederholungen, sondern durch die logische Behandlung von Duplikaten erreicht wird. Die typische Falle des "Double-Insert" wird durch den unique_key und die MERGE-Logik umgangen. Dies ist essenziell für wartbare Pipelines in KMU-Umgebungen, wo manuelle Korrekturen oft zu Inkonsistenzen führen.

Wissens-Check

Vier Fragen zur Selbstkontrolle. Klicken Sie jede Frage an, um die richtige Antwort und Erklärung zu sehen.

Was ist der Hauptunterschied zwischen ETL und ELT?
  • A) ETL verarbeitet Daten in der Cloud, ELT lokal
  • B) Bei ETL erfolgt die Transformation vor dem Laden, bei ELT nach dem Laden
  • C) ETL nutzt immer Staging-Areas, ELT nicht
  • D) ELT ist nur für Big-Data-Umgebungen geeignet

Richtige Antwort: B. Der wesentliche Unterschied liegt im Zeitpunkt der Transformation: ETL transformiert Daten vor dem Laden ins Zielsystem, während ELT Daten zuerst lädt und dann im Zielsystem transformiert.

Warum ist Idempotenz in ETL-Prozessen wichtig?
  • A) Sie reduziert den Speicherbedarf der Datenbank
  • B) Sie ermöglicht die wiederholte Ausführung von Pipelines ohne Ergebnisänderung
  • C) Sie beschleunigt die Datenextraktion
  • D) Sie ist nur bei ELT-Prozessen relevant

Richtige Antwort: B. Idempotenz stellt sicher, dass die wiederholte Ausführung einer Pipeline zum identischen Ergebnis führt, was für die Datenkonsistenz bei Fehlern oder Neustarts entscheidend ist.

Welche Aufgabe hat eine Staging-Area in einem ETL-Prozess?
  • A) Sie dient als primäre Datenspeicherung für Endanwender
  • B) Sie bereinigt, validiert und bereitet Daten für das Zielsystem vor
  • C) Sie ersetzt das Data Warehouse vollständig
  • D) Sie ist nur für ELT-Prozesse erforderlich

Richtige Antwort: B. Die Staging-Area fungiert als Zwischenstation, in der Daten bereinigt, validiert und transformiert werden, bevor sie ins Zielsystem geladen werden.

Welches Tool wird primär zur Orchestrierung von ETL-Pipelines eingesetzt?
  • A) dbt
  • B) Talend
  • C) Apache Airflow
  • D) SQL

Richtige Antwort: C. Apache Airflow ist speziell für die Orchestrierung komplexer Workflows und Datenpipelines entwickelt, während dbt sich auf die Transformation konzentriert und Talend eine umfassende ETL-Plattform ist.