Airflow: XComArgs
The TaskFlowAPI introduces a class called XComArg. It is a key component of enabling the passing of information from one task to another in an easier and more intuitive way (e.g., not having to use xcom_pull()
and xcom_push()
. When you use the @task
decorator, you are using XComArg
s.
>>> from airflow.models.xcom_arg import XComArg
>>> from airflow.operators.python import PythonOperator
>>> from airflow.decorators import task
>>> from airflow import DAG
>>> from datetime import datetime
>>> dag = DAG(dag_id="foo",start_date=datetime(2025,1,1), schedule="@once")
>>> task1 = PythonOperator(task_id="task1", python_callable=lambda: 42, dag=dag)
# task1 is a Task... nothing special here
>>> task1
<Task(PythonOperator): task1>
# you can encapsulate the task using the XComArg class
>>> xcom = XComArg(task1)
>>> xcom
XComArg(<Task(PythonOperator): task1>)
# now you can access the key xcom_pull functionality
>>> print(xcom)
{{ task_instance.xcom_pull(task_ids='task1', dag_id='foo', key='return_value') }}
# you can customize the key of the xcom you want to pull
>>> print(xcom['some_other_key'])
{{ task_instance.xcom_pull(task_ids='task1', dag_id='foo', key='some_other_key') }}
# you can also get it from .output
>>> task1
<Task(PythonOperator): task1>
>>> task1.output
XComArg(<Task(PythonOperator): task1>)
>>> print(task1.output)
{{ task_instance.xcom_pull(task_ids='task1', dag_id='foo', key='return_value') }}
# when you call a decorated function it returns an XComArg
>>> @task
... def start():
... return 'start'
...
>>> start()
XComArg(<Task(_PythonDecoratedOperator): start>)
# you cannot call a task in the same way
>>> task1
<Task(PythonOperator): task1>
>>> task1()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'PythonOperator' object is not callable
XComArg
has two important functions: map()
and zip()
map()
lets you apply a python function to every output that a task returns:
XComArg.map(python_func)
example:
with DAG('map_dag', start_date=datetime(2025,1,1), schedule_interval='@once'):
start = PythonOperator(
task_id='start',
python_callable=lambda: ['/usr/folder_a/', '/usr/folder_b/', '/usr/folder_c/']
)
# append "data/" to every output of the start task
new_list = start.output.map(lambda path: path + 'data/')
end = PythonOperator(
task_id='end',
python_callable=lambda new_list: print([path for path in new_list]),
op_args=[new_list]
)
zip()
works like the builtin zip function works:
with DAG('dag', start_date=datetime(2022, 1, 1), schedule='@once', catchup=False):
@task
def get_path():
return ['/usr/local/', '/bin/test/', '/home/me/']
get_filenames = PythonOperator(
task_id='get_filenames',
python_callable=lambda: ['file_a', 'file_b', 'file_c']
)
@task
def get_extensions():
return ['.txt', '.zip', '.parquet']
download = PythonOperator(
task_id='download',
python_callable=lambda file_a, file_b, file_c: print(f'{file_a} {file_b} {file_c}'),
op_args=get_path().zip(get_filenames.output, get_extensions())
)