Modul 4 von 15 · 📖 10 min Lesezeit · ⏱ 30 min gesamt
FI-DPA 04 ETL-ELT-Strecken
Inhaltsverzeichnis (7 Abschnitte)
FI-DPA 04 ETL/ELT-Strecken
ETL/ELT-Prozesse bilden das Rückgrat moderner Datenintegration. In diesem Modul erlernen Sie die Konzepte zur Extraktion, Transformation und Laden von Daten zwischen verschiedenen Systemen. Sie verstehen, wie Staging-Areas zur Qualitätsicherung eingesetzt werden und warum Idempotenz in der Datenverarbeitung entscheidend ist. Praktische Beispiele mit Airflow, dbt und Talend runden Ihr Wissen ab.
Sie werden in der Lage sein, ETL/ELT-Pipelines zu entwerfen, zu implementieren und zu warten. Der Fokus liegt auf robusten, wartbaren und performanten Lösungen für den Einsatz in KMU-Umgebungen.
Konzepte und Hintergrund
- ETL (Extract, Transform, Load)
- Der klassische Ansatz, bei dem Daten zuerst extrahiert, dann in einer separaten Datenbank transformiert und schließlich in das Zielsystem geladen werden. Die Transformation erfolgt vor dem Laden in das Zielsystem.
- ELT (Extract, Load, Transform)
- Ein moderner Ansatz, bei dem Daten direkt in das Zielsystem (meist ein Data Warehouse) geladen und dort transformiert werden. Dies nutzt die Skalierbarkeit moderner Data-Warehouses und ermöglicht flexiblere Transformationen.
- Staging-Area
- Ein temporärer Speicherbereich, der als Zwischenstation für Daten während des ETL-Prozesses dient. Hier werden Daten bereinigt, validiert und aufbereitet, bevor sie in das Zielsystem geladen werden.
- Idempotenz
- Eine Eigenschaft von Operationen, die mehrmals ausgeführt werden können, ohne das Ergebnis zu verändern. Im ETL-Kontext bedeutet dies, dass die Ausführung einer Pipeline mehrmals zum identischen Ergebnis führt, auch wenn Datenquellen bereits verarbeitet wurden.
- Orchestrierung
- Die Koordination verschiedener Tasks und Workflows in einem ETL-Prozess. Tools wie Airflow ermöglichen die Planung, Ausführung und Überwachung komplexer Datenpipelines.
Architektur-Diagramm
flowchart LR
A[Datenquellen] --> B(Extraktion)
B --> C[Staging-Area]
C --> D[Transformation]
D --> E[Zielsystem]
F[Airflow] --> B
F --> D
G[dbt] --> D
H[Talend] --> B
H --> D
Praktische Schritte
- Installieren Sie Apache Airflow mit pip in einer virtuellen Umgebung. Dies stellt eine saubere Trennung von Projekten sicher.
- Konfigurieren Sie die Airflow-Datenbank mit PostgreSQL für verbesserte Performance und Skalierbarkeit.
- Erstellen Sie in dbt ein neues Projekt und konfigurieren Sie die Verbindungsparameter zu Ihrer Staging-Datenbank.
- Definieren Sie in dbt Modelle für Ihre Transformationen mit SQL-Dateien im Ordner models/.
- Erstellen Sie in Airflow einen DAG, der dbt aufruft, nachdem die Daten extrahiert wurden.
- Implementieren Sie in Talend eine Job-Komponente zur Datenbereinigung, die doppelte Einträge basierend auf einer eindeutigen ID entfernt.
- Konfigurieren Sie in Airflow Retry-Policies für kritische Tasks, um vorübergehende Fehler zu handhaben.
- Implementieren Sie in dbt Tests zur Sicherstellung der Datenqualität, z.B. Eindeutigkeit von IDs.
- Verwenden Sie in Airflow den Sensor-Operator, um auf das Erscheinen neuer Dateien in einem Verzeichnis zu warten.
- Konfigurieren Sie in Talend eine Verbindung zu Ihrer Datenbank mit Verbindungspooling für verbesserte Performance.
python -m venv airflow-env
source airflow-env/bin/activate
pip install apache-airflow
export AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:password@localhost/airflow
airflow db init
dbt init my_project
cd my_project
vim profiles.yml
mkdir -p models/staging
cat > models/staging/customers.sql <<EOF
select id, name, email from raw.customers
where email is not null
EOF</code>
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime
with DAG('etl_pipeline', start_date=datetime(2023,1,1)) as dag:
run_dbt = BashOperator(
task_id='run_dbt',
bash_command='dbt run'
)
tRowGenerator --component tUniqueRow --component tFilterRow --component tLogRow
run_dbt = BashOperator(
task_id='run_dbt',
bash_command='dbt run',
retries=3,
retry_delay=timedelta(minutes=5)
)
cat > models/staging/schema.yml <<EOF
version: 2
models:
- name: customers
columns:
- name: id
tests:
- unique
EOF</code>
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/incoming/*.csv',
poke_interval=60,
timeout=3600
)
tDBConnection --component tDBOutput --component tDBCommit
Häufige Fallstricke
Weiterführende Ressourcen
Worked Example: Idempotente ELT-Pipeline für Umsatzdaten mit dbt und Airflow
Situation: Die Firma LogistikSchnell GmbH betreibt ein Cloud-Data-Warehouse (Snowflake). Täglich werden neue orders.csv-Dateien aus dem ERP-System in einen Raw-Bucket geladen. Das Problem: Bei Netzwerkfehlern bricht die ETL-Pipeline ab. Ein manuelles Neustarten führt dazu, dass dieselben Datensätze doppelt geladen werden, was die monatlichen Umsatzberichte verfälscht. Sie müssen eine idempotente ELT-Strecke implementieren, die bei Wiederholung stets zum gleichen, korrekten Zustand führt.
Wir entscheiden uns für den ELT-Ansatz (Extract, Load, Transform), da Snowflake die Transformationslast effizient übernimmt. Als Werkzeug nutzen wir dbt für die SQL-basierte Transformation und Airflow zur Orchestrierung. Der Schlüssel zur Idempotenz liegt in der Verwendung von MERGE-Operationen statt einfacher INSERT-Befehle.
Durchführung
Wir definieren die Schritte zur Implementierung der idempotenten Transformation.
- Extraktion und Loading (Raw Layer): Die CSV-Datei wird unverändert in die Tabelle
raw_ordersgeladen. Wir nutzen einenCOPY INTO-Befehl, der Append-only ist. Hier ist Duplizierung noch erlaubt, da wir später bereinigen. - Transformation (dbt Model): Wir schreiben ein dbt-Model
stg_orders.sql, das die Rohdaten bereinigt. Entscheidend ist die Nutzung vondbt.ref()und logischer Filterung. - Idempotentes Laden (Marts Layer): Das finale Model
fct_sales.sqlmuss sicherstellen, dass alte Daten überschrieben oder gemergt werden, anstatt neue Zeilen anzuhängen.
Sehen wir uns den konkreten dbt-Code für das finale Model an. Wir verwenden Snowflakes MERGE-Syntax, um auf Änderungen zu reagieren:
-- models/marts/fct_sales.sql
{{ config(
materialized='incremental',
unique_key='order_id'
) }}
-- Wir lesen aus der staging-Tabelle
WITH source_data AS (
SELECT * FROM {{ ref('stg_orders') }}
),
-- Nur neue oder geänderte Datensätze auswählen (Delta)
new_records AS (
SELECT * FROM source_data
WHERE order_date >= (SELECT COALESCE(MAX(order_date), '1900-01-01') FROM {{ this }})
)
-- Idempotenter Merge: Aktualisiert existierende, fügt neue hinzu
MERGE INTO {{ this }} AS target
USING new_records AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
target.amount = source.amount,
target.status = source.status,
target.updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT
(order_id, amount, status, updated_at)
VALUES
(source.order_id, source.amount, source.status, CURRENT_TIMESTAMP());
Um dies in Airflow zu orchestrieren, definieren wir die Abhängigkeiten. Wir stellen sicher, dass die Transformation erst beginnt, wenn das Loading abgeschlossen ist.
# dags/elt_logistics.py
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudExecutionTask
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudTrigger
# ... (DAG Definition und Python-Operator für das Loading)
# Schritt 1: Loading nach Raw
load_raw = PythonOperator(
task_id='load_raw_orders',
python_callable=load_csv_to_snowflake,
)
# Schritt 2: dbt Run (Transformation)
# dbt Cloud Trigger erlaubt asynchrones Warten auf den Job
transform_dbt = DbtCloudTriggerOperator(
task_id='run_dbt_incremental',
dbt_cloud_conn_id='dbt_cloud_conn',
trigger_id=12345, # ID des dbt Cloud Jobs
wait=True,
)
load_raw >> transform_dbt
Ergebnis
Das Ergebnis ist eine Tabelle fct_sales, die stets den aktuellen Stand der Wahrheit widerspiegelt. Selbst wenn die Pipeline heute, morgen und übermorgen läuft und Daten sich überschneiden, enthält jede order_id genau einen Satz von Fakten. Die Berichtstabelle zeigt für den 15.03.2024 exakt den Betrag von 1.500 €, egal ob der Job einmal oder zehnmal ausgeführt wurde.
| Szenario | Aktion | Ergebnis in fct_sales |
Status |
|---|---|---|---|
| Erste Ausführung | INSERT neuer Datensatz (ID: 101) | 1 Zeile mit ID 101 | Erfolg |
| Neustart (Duplikat) | Upsert (Update) für ID 101 | 1 Zeile mit ID 101 (aktualisiertes Timestamp) | Keine Duplikate |
| Neuer Tag (neue Daten) | INSERT neuer Datensatz (ID: 102) | 2 Zeilen (ID 101, 102) | Konsistent |
Reflexion
Mit diesem Ansatz haben Sie verstanden, dass Idempotenz im ELT-Kontext nicht durch das Verhindern von Wiederholungen, sondern durch die logische Behandlung von Duplikaten erreicht wird. Die typische Falle des "Double-Insert" wird durch den unique_key und die MERGE-Logik umgangen. Dies ist essenziell für wartbare Pipelines in KMU-Umgebungen, wo manuelle Korrekturen oft zu Inkonsistenzen führen.
Wissens-Check
Vier Fragen zur Selbstkontrolle. Klicken Sie jede Frage an, um die richtige Antwort und Erklärung zu sehen.
Was ist der Hauptunterschied zwischen ETL und ELT?
- A) ETL verarbeitet Daten in der Cloud, ELT lokal
- B) Bei ETL erfolgt die Transformation vor dem Laden, bei ELT nach dem Laden
- C) ETL nutzt immer Staging-Areas, ELT nicht
- D) ELT ist nur für Big-Data-Umgebungen geeignet
Richtige Antwort: B. Der wesentliche Unterschied liegt im Zeitpunkt der Transformation: ETL transformiert Daten vor dem Laden ins Zielsystem, während ELT Daten zuerst lädt und dann im Zielsystem transformiert.
Warum ist Idempotenz in ETL-Prozessen wichtig?
- A) Sie reduziert den Speicherbedarf der Datenbank
- B) Sie ermöglicht die wiederholte Ausführung von Pipelines ohne Ergebnisänderung
- C) Sie beschleunigt die Datenextraktion
- D) Sie ist nur bei ELT-Prozessen relevant
Richtige Antwort: B. Idempotenz stellt sicher, dass die wiederholte Ausführung einer Pipeline zum identischen Ergebnis führt, was für die Datenkonsistenz bei Fehlern oder Neustarts entscheidend ist.
Welche Aufgabe hat eine Staging-Area in einem ETL-Prozess?
- A) Sie dient als primäre Datenspeicherung für Endanwender
- B) Sie bereinigt, validiert und bereitet Daten für das Zielsystem vor
- C) Sie ersetzt das Data Warehouse vollständig
- D) Sie ist nur für ELT-Prozesse erforderlich
Richtige Antwort: B. Die Staging-Area fungiert als Zwischenstation, in der Daten bereinigt, validiert und transformiert werden, bevor sie ins Zielsystem geladen werden.
Welches Tool wird primär zur Orchestrierung von ETL-Pipelines eingesetzt?
- A) dbt
- B) Talend
- C) Apache Airflow
- D) SQL
Richtige Antwort: C. Apache Airflow ist speziell für die Orchestrierung komplexer Workflows und Datenpipelines entwickelt, während dbt sich auf die Transformation konzentriert und Talend eine umfassende ETL-Plattform ist.