onprema

Airflow: Dynamic Task Mapping

Problem(s) it solves:

When you don't know the data from upstream (e.g., the names of files in an S3 bucket, the number of files to process, etc).

Implementation

Uses the Operator.expand(args=[]) method to generate 1 task per arg in args

partial() is an operator method that allows you do create static tasks that don't expand, along with dynamic ones that get expanded.

From the docs,

As well as passing arguments that get expanded at run-time, it is possible to pass arguments that don’t change—in order to clearly differentiate between the two kinds we use different functions, expand() for mapped arguments, and partial() for unmapped ones.

@task
def add(x: int, y: int):
    return x + y

added_values = add.partial(y=10).expand(x=[1, 2, 3])

# This results in add function being expanded to
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)

Configuration

AIRFLOW__CORE__MAX_MAP_LENGTH=1024 -> sets a limit on the number of dynamic tasks that expand() can create.

Operator.partial(max_active_tis_per_dag=16).expand(...) -> sets a limit on the number of task instances of a specific mapped task. This is

Notes

Dynamic tasks are indicated with square bracket [] in the UI

You can click on "Mapped Tasks" to inspect each dynamic task

In the task_instance table in the metastore, there is a column called map_index

#airflow