Airflow: Dynamic Task Mapping
Problem(s) it solves:
When you don't know the data from upstream (e.g., the names of files in an S3 bucket, the number of files to process, etc).
Implementation
Uses the Operator.expand(args=[])
method to generate 1 task per arg in args
partial()
is an operator method that allows you do create static tasks that don't expand, along with dynamic ones that get expanded.
From the docs,
As well as passing arguments that get expanded at run-time, it is possible to pass arguments that don’t change—in order to clearly differentiate between the two kinds we use different functions, expand() for mapped arguments, and partial() for unmapped ones.
@task
def add(x: int, y: int):
return x + y
added_values = add.partial(y=10).expand(x=[1, 2, 3])
# This results in add function being expanded to
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)
Configuration
AIRFLOW__CORE__MAX_MAP_LENGTH=1024
-> sets a limit on the number of dynamic tasks that expand()
can create.
Operator.partial(max_active_tis_per_dag=16).expand(...)
-> sets a limit on the number of task instances of a specific mapped task. This is
Notes
Dynamic tasks are indicated with square bracket []
in the UI
You can click on "Mapped Tasks" to inspect each dynamic task
In the task_instance
table in the metastore, there is a column called map_index
- For static tasks, the value is
-1
- For dynamic tasks, the value starts at 0 (like an array)