Sometimes, your dags are so similar that generating multiple dags would be a waste.
Code
We will use a config file based approach to create as many dags as we want. Everything in the config files will be used to auto-generate dags. It supports setting various dag parameters:
- Start time
- Schedule
- Catchup
- And the tasks in the dag
Config files
Config json for the first dag
{
"dag_id": "auto_dag_a",
"start": "2024-06-15T00:00:00",
"schedule": "@once",
"catchup": false,
"task_ids": ["start", "task_1", "end"]
}
Config json for the second dag
{
"dag_id": "auto_dag_b",
"start": "2024-06-14T00:00:00",
"schedule": null,
"catchup": false,
"task_ids": ["start", "task_1", "task_2", "end"]
}
Generating DAGs
This is the code that generates dags based on the json files. You should save this file in the root directory of the dags where Airflow Scheduler can find and process.
import glob
import json
from dataclasses import asdict, dataclass
from typing import Union, List
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from pendulum import DateTime
@dataclass
class DagConfig:
dag_id: str
schedule: Union[str, None]
catchup: bool
start: Union[str, None]
task_ids: List[str]
@property
def start_date(self) -> DateTime:
return pendulum.parse(self.start)
def auto_generate_dag(config: DagConfig) -> DAG:
"""Generate and return a dag"""
doc_md = f"""
# Auto generated dag
Generated with this config: ```{asdict(config)}```
"""
dag = DAG(
dag_id=config.dag_id,
description=f"Auto generated dag: {config.dag_id}",
schedule=config.schedule,
catchup=config.catchup,
start_date=config.start_date,
doc_md=doc_md,
tags=["auto"]
)
return dag
def get_dag_configs() -> list[DagConfig]:
"""Read all dag config json files and return a list of `DagConfig`s"""
dag_configs = []
for file_path in glob.glob("dags/dag_configs/*.json"):
with open(file_path, "r") as f:
dag_config_json = json.load(f)
dag_config = DagConfig(**dag_config_json)
dag_configs.append(dag_config)
return dag_configs
def add_tasks_to_dag(dag: DAG, dag_config: DagConfig) -> None:
previous_task = None
for task_id in dag_config.task_ids:
task = EmptyOperator(task_id=task_id, dag=dag)
if previous_task is not None:
previous_task.set_downstream(task)
previous_task = task
def generate_dags() -> None:
"""Auto generate dags"""
dag_configs = get_dag_configs()
for dag_config in dag_configs:
dag = auto_generate_dag(dag_config)
add_tasks_to_dag(dag, dag_config)
globals()[dag.dag_id] = dag
generate_dags()
Results
Airflow UI
Now we see the 2 dags in the Airflow UI
First dag’s tasks
Second dag’s tasks
Caveats
This code will be executed very frequently by the Airflow Scheduler. This ensures that the dags will be up-to-date. However, you should not run expensive operations in this code otherwise scheduler may not function properly.
Full code
You can check out the full code here. Let me know what you think in the comments.
Comments