Introduction : Pourquoi Apache Airflow pour votre ETL en PME
L’automatisation des flux de données est devenue un enjeu majeur pour les PME françaises en 2026. Une pipeline ETL (Extract, Transform, Load) permet d’extraire des données de sources multiples, de les transformer selon vos besoins métier, puis de les charger dans un système centralisé pour l’analyse.
Apache Airflow se démarque des solutions propriétaires coûteuses comme Talend ou Informatica par son approche open-source et sa flexibilité totale. Développé initialement par Airbnb, Airflow est devenu la référence pour orchestrer des workflows de données complexes.
Les cas d’usage concrets en PME sont nombreux : consolidation quotidienne de données CRM et ERP, synchronisation automatique entre votre système de facturation et votre outil d’analytics, ou encore agrégation de données commerciales multi-sources pour le reporting. Si vous gérez plusieurs sources de données et passez du temps à faire des exports manuels, ce tutoriel est fait pour vous.
Prérequis techniques et environnement de développement
Avant de commencer, vous aurez besoin de :
- Docker et Docker Compose installés sur votre machine ou serveur
- Des connaissances Python de base pour écrire des DAGs (Directed Acyclic Graphs)
- Un accès aux sources de données que vous souhaitez intégrer : bases SQL, APIs REST, fichiers CSV
- Un serveur avec minimum 4 Go de RAM et 2 CPU pour faire tourner Airflow confortablement
Installation et configuration d’Apache Airflow avec Docker
La façon la plus simple de démarrer avec Airflow est d’utiliser Docker Compose. On va télécharger le fichier de configuration officiel :
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'Créez ensuite les dossiers nécessaires pour persister les données :
mkdir -p ./dags ./logs ./plugins ./configecho -e "AIRFLOW_UID=$(id -u)" > .envLe fichier docker-compose.yaml contient déjà les configurations essentielles. Voici les variables d’environnement importantes à connaître :
environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false'AIRFLOW__CORE__LOAD_EXAMPLES à false pour éviter de charger les DAGs d’exemple qui encombrent l’interface. Initialisez la base de données et lancez les conteneurs :
docker-compose up airflow-initdocker-compose up -dAccédez à l’interface web sur http://localhost:8080. Les identifiants par défaut sont airflow / airflow. Changez-les immédiatement en production via la CLI :
docker-compose run airflow-worker airflow users create \ --username admin \ --firstname Mohamed \ --lastname Boukri \ --role Admin \ --email contact@kodixar.comVérifiez que le scheduler et le webserver tournent correctement avec docker-compose ps. Vous devriez voir tous les services en état “healthy”.
Création de votre premier DAG : extraction de données depuis une API
Un DAG Airflow est un fichier Python qui définit un workflow. Créez un fichier dags/extraction_api.py :
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.models import Variableimport requestsimport json
default_args = { 'owner': 'kodixar', 'depends_on_past': False, 'start_date': datetime(2026, 2, 14), 'email_on_failure': True, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5),}
dag = DAG( 'extraction_api_commerciale', default_args=default_args, description='Extraction quotidienne des données commerciales', schedule_interval='0 6 * * *', # Tous les jours à 6h catchup=False, tags=['etl', 'commercial'],)
def extraire_donnees_api(**context): """Extraction des données depuis l'API commerciale""" api_url = Variable.get("api_commerciale_url") api_key = Variable.get("api_commerciale_key")
headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' }
response = requests.get( f"{api_url}/ventes/daily", headers=headers, params={'date': context['ds']} # Date d'exécution du DAG )
response.raise_for_status() donnees = response.json()
# Stockage dans XCom pour les tâches suivantes context['ti'].xcom_push(key='donnees_brutes', value=donnees)
print(f"Extraction réussie : {len(donnees)} enregistrements") return len(donnees)
tache_extraction = PythonOperator( task_id='extraire_donnees', python_callable=extraire_donnees_api, dag=dag,)Pour gérer les credentials de façon sécurisée, utilisez les Variables Airflow. Dans l’interface web, allez dans Admin > Variables et créez :
api_commerciale_url: l’URL de base de votre APIapi_commerciale_key: votre clé d’API
Activez le DAG depuis l’interface et déclenchez une exécution manuelle pour tester. Consultez les logs pour vérifier que l’extraction fonctionne correctement.
Transformation des données avec des tâches Python personnalisées
Ajoutons maintenant une tâche de transformation qui nettoie et agrège les données extraites. Installez pandas dans le conteneur Airflow en créant un fichier requirements.txt :
pandas==2.2.0Puis modifiez le docker-compose.yaml pour installer ces dépendances (section x-airflow-common). Ajoutez cette fonction de transformation dans votre DAG :
import pandas as pd
def transformer_donnees(**context): """Transformation et agrégation des données commerciales""" ti = context['ti'] donnees_brutes = ti.xcom_pull(task_ids='extraire_donnees', key='donnees_brutes')
# Conversion en DataFrame pandas df = pd.DataFrame(donnees_brutes)
# Nettoyage : suppression des doublons et valeurs nulles df = df.drop_duplicates(subset=['id_vente']) df = df.dropna(subset=['montant', 'region'])
# Transformation : calcul du montant TTC df['montant_ttc'] = df['montant_ht'] * 1.20
# Agrégation par région agregation = df.groupby('region').agg({ 'montant_ttc': 'sum', 'id_vente': 'count' }).reset_index()
agregation.columns = ['region', 'ca_total', 'nb_ventes']
# Stockage pour la tâche de chargement donnees_transformees = agregation.to_dict('records') ti.xcom_push(key='donnees_transformees', value=donnees_transformees)
print(f"Transformation réussie : {len(agregation)} régions") return len(agregation)
tache_transformation = PythonOperator( task_id='transformer_donnees', python_callable=transformer_donnees, dag=dag,)
# Définition de la dépendance entre tâchestache_extraction >> tache_transformationL’opérateur >> définit l’ordre d’exécution : la transformation ne démarre qu’après le succès de l’extraction. Si l’extraction échoue, Airflow retentera automatiquement 2 fois avec 5 minutes d’intervalle (défini dans default_args).
Chargement des données transformées vers une base PostgreSQL
Pour stocker les données transformées, on va utiliser PostgreSQL. Configurez d’abord une connexion dans Airflow (Admin > Connections) :
- Connection Id :
postgres_dwh - Connection Type : Postgres
- Host :
postgres(ou l’adresse de votre serveur) - Schema :
datawarehouse - Login :
airflow - Password : votre mot de passe
- Port :
5432
Créez la table de destination avec le PostgresOperator :
from airflow.providers.postgres.operators.postgres import PostgresOperatorfrom airflow.providers.postgres.hooks.postgres import PostgresHook
tache_creation_table = PostgresOperator( task_id='creer_table_ventes_region', postgres_conn_id='postgres_dwh', sql=""" CREATE TABLE IF NOT EXISTS ventes_par_region ( date_extraction DATE, region VARCHAR(100), ca_total DECIMAL(12,2), nb_ventes INTEGER, PRIMARY KEY (date_extraction, region) ); """, dag=dag,)
def charger_donnees_postgres(**context): """Chargement des données dans PostgreSQL""" ti = context['ti'] donnees = ti.xcom_pull(task_ids='transformer_donnees', key='donnees_transformees') date_exec = context['ds']
hook = PostgresHook(postgres_conn_id='postgres_dwh') conn = hook.get_conn() cursor = conn.cursor()
# Suppression des données existantes pour cette date (idempotence) cursor.execute( "DELETE FROM ventes_par_region WHERE date_extraction = %s", (date_exec,) )
# Insertion batch des nouvelles données for ligne in donnees: cursor.execute( """ INSERT INTO ventes_par_region (date_extraction, region, ca_total, nb_ventes) VALUES (%s, %s, %s, %s) """, (date_exec, ligne['region'], ligne['ca_total'], ligne['nb_ventes']) )
conn.commit() cursor.close()
print(f"Chargement réussi : {len(donnees)} lignes insérées") return len(donnees)
tache_chargement = PythonOperator( task_id='charger_donnees', python_callable=charger_donnees_postgres, dag=dag,)
# Chaînage complet de la pipelinetache_extraction >> tache_transformation >> tache_creation_table >> tache_chargementLa suppression avant insertion garantit l’idempotence : relancer le DAG pour la même date produira toujours le même résultat. C’est une bonne pratique essentielle en data engineering.
Vérifiez l’intégrité des données en vous connectant à PostgreSQL :
docker-compose exec postgres psql -U airflow -d datawarehouseSELECT * FROM ventes_par_region ORDER BY date_extraction DESC LIMIT 10;Planification, monitoring et bonnes pratiques pour la production
Le schedule_interval définit la fréquence d’exécution. Voici les formats courants :
| Expression | Signification |
|---|---|
'@daily' | Tous les jours à minuit |
'0 6 * * *' | Tous les jours à 6h |
'0 */4 * * *' | Toutes les 4 heures |
'0 0 * * 1' | Tous les lundis à minuit |
Pour recevoir des alertes email en cas d’échec, configurez le SMTP dans docker-compose.yaml :
AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.comAIRFLOW__SMTP__SMTP_PORT: 587AIRFLOW__SMTP__SMTP_USER: votre-email@gmail.comAIRFLOW__SMTP__SMTP_PASSWORD: votre-mot-de-passe-appAIRFLOW__SMTP__SMTP_MAIL_FROM: airflow@kodixar.comPuis ajoutez l’email dans default_args :
default_args = { 'email': ['contact@kodixar.com'], 'email_on_failure': True,}Les logs Airflow sont accessibles depuis l’interface web (bouton “Log” sur chaque tâche) et stockés dans le dossier ./logs. Utilisez-les pour debugger et auditer vos exécutions.
Bonnes pratiques à respecter :
- Rendez vos tâches idempotentes : relancer un DAG doit produire le même résultat
- Utilisez le backfill pour rattraper des exécutions manquées :
airflow dags backfill -s 2026-02-01 -e 2026-02-10 extraction_api_commerciale - Optimisez le parallélisme en configurant
max_active_runsetconcurrencydans le DAG - Évitez les tâches trop longues : découpez-les en sous-tâches
Pour déployer en production, vous pouvez utiliser Coolify pour orchestrer vos conteneurs Docker ou un serveur dédié avec Docker Compose. Configurez un reverse proxy Nginx pour sécuriser l’accès à l’interface web.
Conclusion et prochaines étapes pour industrialiser vos pipelines
On a vu comment mettre en place une pipeline ETL complète avec Apache Airflow : extraction depuis une API, transformation avec pandas, et chargement dans PostgreSQL. Cette stack est parfaitement adaptée aux besoins des PME qui veulent automatiser leurs flux de données sans investir dans des solutions propriétaires coûteuses.
Les évolutions possibles sont nombreuses. Vous pouvez intégrer dbt (data build tool) pour gérer vos transformations SQL de façon versionnée et testable. La connexion à des outils de visualisation comme Metabase ou Superset permet ensuite d’exploiter vos données consolidées.
Pour approfondir, consultez la documentation officielle d’Airflow et rejoignez la communauté française sur les forums spécialisés. Dans un prochain article, on explorera comment orchestrer des workflows de machine learning avec Airflow et MLflow.
Besoin d’aide pour industrialiser vos pipelines de données ? Chez Kodixar, j’accompagne les PME dans la mise en place de solutions data engineering sur mesure.