onprema

Airflow: Dynamic DAGs

What problem(s) it solves

If you have a workflow, and you want to replicate it many many times but each one needs to be slightly different.

Without dynamic dags, you would need to have n Python files / DAGs, where n is the number of variations of the workflow.

This can become very verbose, lots of duplication, and hard to maintain.

It basically makes a DAG like a reusable function.

Implementation

The Single File Method

Generating DAGs by using a loop:

# This is the DAG generator function. It returns a DAG.
def create_dag(filename):
    with DAG(f'process_{filename}', start_date=datetime(2021,1,1), schedule_interval='@daily', catchup=False) as dag:
        @task
        def extract(filename):
            return filename
        @task
        def process(filename):
            return filename
        @task
        def send_email(filename):
            print(filename)
            return filename
        send_email(process(extract(filename)))
    return dag

# This is where you call the DAG generation function using a loop
for f in ["file_a.csv", "file_b.csv", "file_c.csv"]:

    # You can't just call the function.
    # create_dag(f)

    # You have to do it this way because Airflow registers DAGs in the `globals` environment
    globals()[f"dag_{f}"] = create_dag(f)

Pros:

Cons:


Multi-file Method

The idea is that you have a template file with placeholder values, config files (data/___.json in the example below) for each dag, and a script to fill in the placeholder values.

 tree include
include
├── data
│   ├── file_a.json
│   ├── file_b.json
│   └── file_c.json
├── scripts
│   └── generate_dag.py
└── templates
    └── process_file.py

An admin would just run the script and the deploy the changes. It is idempotent, in theory, and you can enforce that within the script. You could also enhance the script to remove dags from the metastore that were deleted.

Pros:

Notes

Dynamic DAGs should not be confused with Dynamic Task Mapping.

Dynamic DAGs -> Create DAGs based on predefined values
Dynamic Task Mapping -> Create tasks based on unknown values from previous tasks' output


For the "single-file" method, you can create DAGs by registering them with the globals Python context:

for f in ["file_a.csv", "file_b.csv", "file_c.csv"]:
    globals()[f"dag_{f}"] = create_dag(f)

One thing that isn't great about this, is that if you change the name of the arguments in the list, the old DAGs will still appear in the metastore and UI. In the screenshot below, I changed file_a to file_a.csv and the file_a DAG still exists. It might get cleaned up when the DAG parser runs, but the DAG still remains in the metastore!

Screenshot 2025-08-07 at 1

#airflow