onprema

Airflow: XComArgs

The TaskFlowAPI introduces a class called XComArg. It is a key component of enabling the passing of information from one task to another in an easier and more intuitive way (e.g., not having to use xcom_pull() and xcom_push(). When you use the @task decorator, you are using XComArgs.

>>> from airflow.models.xcom_arg import XComArg
>>> from airflow.operators.python import PythonOperator
>>> from airflow.decorators import task
>>> from airflow import DAG
>>> from datetime import datetime
>>> dag = DAG(dag_id="foo",start_date=datetime(2025,1,1), schedule="@once")
>>> task1 = PythonOperator(task_id="task1", python_callable=lambda: 42, dag=dag)

# task1 is a Task... nothing special here
>>> task1
<Task(PythonOperator): task1>

# you can encapsulate the task using the XComArg class
>>> xcom = XComArg(task1)
>>> xcom
XComArg(<Task(PythonOperator): task1>)

# now you can access the key xcom_pull functionality
>>> print(xcom)
{{ task_instance.xcom_pull(task_ids='task1', dag_id='foo', key='return_value') }}

# you can customize the key of the xcom you want to pull
>>> print(xcom['some_other_key'])
{{ task_instance.xcom_pull(task_ids='task1', dag_id='foo', key='some_other_key') }}

# you can also get it from .output
>>> task1
<Task(PythonOperator): task1>
>>> task1.output
XComArg(<Task(PythonOperator): task1>)
>>> print(task1.output)
{{ task_instance.xcom_pull(task_ids='task1', dag_id='foo', key='return_value') }}

# when you call a decorated function it returns an XComArg
>>> @task
... def start():
...   return 'start'
...
>>> start()
XComArg(<Task(_PythonDecoratedOperator): start>)

# you cannot call a task in the same way
>>> task1
<Task(PythonOperator): task1>
>>> task1()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'PythonOperator' object is not callable

XComArg has two important functions: map() and zip()

map() lets you apply a python function to every output that a task returns:

XComArg.map(python_func)

example:

with DAG('map_dag', start_date=datetime(2025,1,1), schedule_interval='@once'):
    start = PythonOperator(
        task_id='start',
        python_callable=lambda: ['/usr/folder_a/', '/usr/folder_b/', '/usr/folder_c/']
    )

    # append "data/" to every output of the start task
    new_list = start.output.map(lambda path: path + 'data/')

    end = PythonOperator(
        task_id='end',
        python_callable=lambda new_list: print([path for path in new_list]),
        op_args=[new_list]
    )

zip() works like the builtin zip function works:

with DAG('dag', start_date=datetime(2022, 1, 1), schedule='@once', catchup=False):

    @task
    def get_path():
        return ['/usr/local/', '/bin/test/', '/home/me/']

    get_filenames = PythonOperator(
        task_id='get_filenames',
        python_callable=lambda: ['file_a', 'file_b', 'file_c']
    )

    @task
    def get_extensions():
        return ['.txt', '.zip', '.parquet']

    download = PythonOperator(
        task_id='download',
        python_callable=lambda file_a, file_b, file_c: print(f'{file_a} {file_b} {file_c}'),
        op_args=get_path().zip(get_filenames.output, get_extensions())
    )

#airflow