Aller au contenu principal
Analyse de Données 14 février 2026 12 min de lecture

Tutoriel : Mettre en place une ETL pipeline avec Apache Airflow pour PME

Découvrez comment créer une pipeline ETL robuste avec Apache Airflow pour automatiser vos flux de données en PME sans exploser votre budget tech.

M
Mohamed Boukri

Introduction : Pourquoi Apache Airflow pour votre ETL en PME

L’automatisation des flux de données est devenue un enjeu majeur pour les PME françaises en 2026. Une pipeline ETL (Extract, Transform, Load) permet d’extraire des données de sources multiples, de les transformer selon vos besoins métier, puis de les charger dans un système centralisé pour l’analyse.

Apache Airflow se démarque des solutions propriétaires coûteuses comme Talend ou Informatica par son approche open-source et sa flexibilité totale. Développé initialement par Airbnb, Airflow est devenu la référence pour orchestrer des workflows de données complexes.

Les cas d’usage concrets en PME sont nombreux : consolidation quotidienne de données CRM et ERP, synchronisation automatique entre votre système de facturation et votre outil d’analytics, ou encore agrégation de données commerciales multi-sources pour le reporting. Si vous gérez plusieurs sources de données et passez du temps à faire des exports manuels, ce tutoriel est fait pour vous.

Key Takeaway
Apache Airflow permet d’automatiser vos pipelines de données avec du code Python simple, sans licence coûteuse ni vendor lock-in.

Prérequis techniques et environnement de développement

Avant de commencer, vous aurez besoin de :

  • Docker et Docker Compose installés sur votre machine ou serveur
  • Des connaissances Python de base pour écrire des DAGs (Directed Acyclic Graphs)
  • Un accès aux sources de données que vous souhaitez intégrer : bases SQL, APIs REST, fichiers CSV
  • Un serveur avec minimum 4 Go de RAM et 2 CPU pour faire tourner Airflow confortablement
Si vous débutez avec Docker, consultez la documentation officielle pour l’installer sur votre système d’exploitation.

Installation et configuration d’Apache Airflow avec Docker

La façon la plus simple de démarrer avec Airflow est d’utiliser Docker Compose. On va télécharger le fichier de configuration officiel :

Terminal window
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'

Créez ensuite les dossiers nécessaires pour persister les données :

Terminal window
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

Le fichier docker-compose.yaml contient déjà les configurations essentielles. Voici les variables d’environnement importantes à connaître :

environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
Mettez AIRFLOW__CORE__LOAD_EXAMPLES à false pour éviter de charger les DAGs d’exemple qui encombrent l’interface.

Initialisez la base de données et lancez les conteneurs :

Terminal window
docker-compose up airflow-init
docker-compose up -d

Accédez à l’interface web sur http://localhost:8080. Les identifiants par défaut sont airflow / airflow. Changez-les immédiatement en production via la CLI :

Terminal window
docker-compose run airflow-worker airflow users create \
--username admin \
--firstname Mohamed \
--lastname Boukri \
--role Admin \
--email contact@kodixar.com

Vérifiez que le scheduler et le webserver tournent correctement avec docker-compose ps. Vous devriez voir tous les services en état “healthy”.

Création de votre premier DAG : extraction de données depuis une API

Un DAG Airflow est un fichier Python qui définit un workflow. Créez un fichier dags/extraction_api.py :

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import requests
import json
default_args = {
'owner': 'kodixar',
'depends_on_past': False,
'start_date': datetime(2026, 2, 14),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'extraction_api_commerciale',
default_args=default_args,
description='Extraction quotidienne des données commerciales',
schedule_interval='0 6 * * *', # Tous les jours à 6h
catchup=False,
tags=['etl', 'commercial'],
)
def extraire_donnees_api(**context):
"""Extraction des données depuis l'API commerciale"""
api_url = Variable.get("api_commerciale_url")
api_key = Variable.get("api_commerciale_key")
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
response = requests.get(
f"{api_url}/ventes/daily",
headers=headers,
params={'date': context['ds']} # Date d'exécution du DAG
)
response.raise_for_status()
donnees = response.json()
# Stockage dans XCom pour les tâches suivantes
context['ti'].xcom_push(key='donnees_brutes', value=donnees)
print(f"Extraction réussie : {len(donnees)} enregistrements")
return len(donnees)
tache_extraction = PythonOperator(
task_id='extraire_donnees',
python_callable=extraire_donnees_api,
dag=dag,
)

Pour gérer les credentials de façon sécurisée, utilisez les Variables Airflow. Dans l’interface web, allez dans Admin > Variables et créez :

  • api_commerciale_url : l’URL de base de votre API
  • api_commerciale_key : votre clé d’API
Ne stockez jamais vos clés API en dur dans le code source. Utilisez toujours les Variables ou Connections d’Airflow.

Activez le DAG depuis l’interface et déclenchez une exécution manuelle pour tester. Consultez les logs pour vérifier que l’extraction fonctionne correctement.

Transformation des données avec des tâches Python personnalisées

Ajoutons maintenant une tâche de transformation qui nettoie et agrège les données extraites. Installez pandas dans le conteneur Airflow en créant un fichier requirements.txt :

pandas==2.2.0

Puis modifiez le docker-compose.yaml pour installer ces dépendances (section x-airflow-common). Ajoutez cette fonction de transformation dans votre DAG :

import pandas as pd
def transformer_donnees(**context):
"""Transformation et agrégation des données commerciales"""
ti = context['ti']
donnees_brutes = ti.xcom_pull(task_ids='extraire_donnees', key='donnees_brutes')
# Conversion en DataFrame pandas
df = pd.DataFrame(donnees_brutes)
# Nettoyage : suppression des doublons et valeurs nulles
df = df.drop_duplicates(subset=['id_vente'])
df = df.dropna(subset=['montant', 'region'])
# Transformation : calcul du montant TTC
df['montant_ttc'] = df['montant_ht'] * 1.20
# Agrégation par région
agregation = df.groupby('region').agg({
'montant_ttc': 'sum',
'id_vente': 'count'
}).reset_index()
agregation.columns = ['region', 'ca_total', 'nb_ventes']
# Stockage pour la tâche de chargement
donnees_transformees = agregation.to_dict('records')
ti.xcom_push(key='donnees_transformees', value=donnees_transformees)
print(f"Transformation réussie : {len(agregation)} régions")
return len(agregation)
tache_transformation = PythonOperator(
task_id='transformer_donnees',
python_callable=transformer_donnees,
dag=dag,
)
# Définition de la dépendance entre tâches
tache_extraction >> tache_transformation

L’opérateur >> définit l’ordre d’exécution : la transformation ne démarre qu’après le succès de l’extraction. Si l’extraction échoue, Airflow retentera automatiquement 2 fois avec 5 minutes d’intervalle (défini dans default_args).

Utilisez pandas pour manipuler vos données en mémoire. Pour des volumes très importants (>1 Go), privilégiez PySpark ou des transformations SQL directes.

Chargement des données transformées vers une base PostgreSQL

Pour stocker les données transformées, on va utiliser PostgreSQL. Configurez d’abord une connexion dans Airflow (Admin > Connections) :

  • Connection Id : postgres_dwh
  • Connection Type : Postgres
  • Host : postgres (ou l’adresse de votre serveur)
  • Schema : datawarehouse
  • Login : airflow
  • Password : votre mot de passe
  • Port : 5432

Créez la table de destination avec le PostgresOperator :

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
tache_creation_table = PostgresOperator(
task_id='creer_table_ventes_region',
postgres_conn_id='postgres_dwh',
sql="""
CREATE TABLE IF NOT EXISTS ventes_par_region (
date_extraction DATE,
region VARCHAR(100),
ca_total DECIMAL(12,2),
nb_ventes INTEGER,
PRIMARY KEY (date_extraction, region)
);
""",
dag=dag,
)
def charger_donnees_postgres(**context):
"""Chargement des données dans PostgreSQL"""
ti = context['ti']
donnees = ti.xcom_pull(task_ids='transformer_donnees', key='donnees_transformees')
date_exec = context['ds']
hook = PostgresHook(postgres_conn_id='postgres_dwh')
conn = hook.get_conn()
cursor = conn.cursor()
# Suppression des données existantes pour cette date (idempotence)
cursor.execute(
"DELETE FROM ventes_par_region WHERE date_extraction = %s",
(date_exec,)
)
# Insertion batch des nouvelles données
for ligne in donnees:
cursor.execute(
"""
INSERT INTO ventes_par_region (date_extraction, region, ca_total, nb_ventes)
VALUES (%s, %s, %s, %s)
""",
(date_exec, ligne['region'], ligne['ca_total'], ligne['nb_ventes'])
)
conn.commit()
cursor.close()
print(f"Chargement réussi : {len(donnees)} lignes insérées")
return len(donnees)
tache_chargement = PythonOperator(
task_id='charger_donnees',
python_callable=charger_donnees_postgres,
dag=dag,
)
# Chaînage complet de la pipeline
tache_extraction >> tache_transformation >> tache_creation_table >> tache_chargement

La suppression avant insertion garantit l’idempotence : relancer le DAG pour la même date produira toujours le même résultat. C’est une bonne pratique essentielle en data engineering.

Vérifiez l’intégrité des données en vous connectant à PostgreSQL :

Terminal window
docker-compose exec postgres psql -U airflow -d datawarehouse
SELECT * FROM ventes_par_region ORDER BY date_extraction DESC LIMIT 10;

Planification, monitoring et bonnes pratiques pour la production

Le schedule_interval définit la fréquence d’exécution. Voici les formats courants :

ExpressionSignification
'@daily'Tous les jours à minuit
'0 6 * * *'Tous les jours à 6h
'0 */4 * * *'Toutes les 4 heures
'0 0 * * 1'Tous les lundis à minuit

Pour recevoir des alertes email en cas d’échec, configurez le SMTP dans docker-compose.yaml :

AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_USER: votre-email@gmail.com
AIRFLOW__SMTP__SMTP_PASSWORD: votre-mot-de-passe-app
AIRFLOW__SMTP__SMTP_MAIL_FROM: airflow@kodixar.com

Puis ajoutez l’email dans default_args :

default_args = {
'email': ['contact@kodixar.com'],
'email_on_failure': True,
}

Les logs Airflow sont accessibles depuis l’interface web (bouton “Log” sur chaque tâche) et stockés dans le dossier ./logs. Utilisez-les pour debugger et auditer vos exécutions.

En production, activez l’authentification RBAC et changez tous les mots de passe par défaut.

Bonnes pratiques à respecter :

  • Rendez vos tâches idempotentes : relancer un DAG doit produire le même résultat
  • Utilisez le backfill pour rattraper des exécutions manquées : airflow dags backfill -s 2026-02-01 -e 2026-02-10 extraction_api_commerciale
  • Optimisez le parallélisme en configurant max_active_runs et concurrency dans le DAG
  • Évitez les tâches trop longues : découpez-les en sous-tâches

Pour déployer en production, vous pouvez utiliser Coolify pour orchestrer vos conteneurs Docker ou un serveur dédié avec Docker Compose. Configurez un reverse proxy Nginx pour sécuriser l’accès à l’interface web.

Conclusion et prochaines étapes pour industrialiser vos pipelines

On a vu comment mettre en place une pipeline ETL complète avec Apache Airflow : extraction depuis une API, transformation avec pandas, et chargement dans PostgreSQL. Cette stack est parfaitement adaptée aux besoins des PME qui veulent automatiser leurs flux de données sans investir dans des solutions propriétaires coûteuses.

Les évolutions possibles sont nombreuses. Vous pouvez intégrer dbt (data build tool) pour gérer vos transformations SQL de façon versionnée et testable. La connexion à des outils de visualisation comme Metabase ou Superset permet ensuite d’exploiter vos données consolidées.

Pour approfondir, consultez la documentation officielle d’Airflow et rejoignez la communauté française sur les forums spécialisés. Dans un prochain article, on explorera comment orchestrer des workflows de machine learning avec Airflow et MLflow.

Besoin d’aide pour industrialiser vos pipelines de données ? Chez Kodixar, j’accompagne les PME dans la mise en place de solutions data engineering sur mesure.

Disponible pour de nouveaux projets

Besoin d'aide sur ce sujet ?

Contactez-nous pour discuter de votre projet et voir comment nous pouvons vous aider.

Devis gratuit
Sans engagement
Réponse sous 24h