0
votes

I am attempting to return a list of values from an airflow task, and then within another task loop over the list and call a BashOperator using the value as an argument to a python script.

If using a PythonOperator is the correct way then I would be intersted in seeing how to do that, but I do want to have the Python script I am calling be an external file and not another callable in my Airflow Dag.

I am able to loop over the list and print out the value, however I can't figure out how to call the BashOperator which I have placed in the loop.

Here is a simple version of the code, just attempting to echo the value:

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime


DAG = DAG(
    dag_id='locations_test',
    start_date=datetime.now(),
    schedule_interval='@once'
)


def get_locations_function(**kwargs):

    locations = ['one','two','three']

    return locations


get_locations_task = PythonOperator(
    task_id='get_locations_task',
    python_callable=get_locations_function,
    provide_context=True,
    dag=DAG)


def call_loop_over_locations_function(**kwargs):
    ti = kwargs['ti']
    locations = ti.xcom_pull(task_ids='get_locations_task')
    for loc in locations:
        print(loc) #this prints
        bash_operator = BashOperator(
            task_id='do_things_with_location',
            bash_command="echo '%s'" %loc,
            xcom_push=True,
            dag=DAG)

    #this doesn't get called, i have also tried 
    #call_loop_over_locations_task >> bash_operator

    bash_operator 


call_loop_over_locations_task = PythonOperator(
    task_id='call_loop_over_locations_task',
    python_callable=call_loop_over_locations_function,
    provide_context=True,
    dag=DAG)


get_locations_task >> call_loop_over_locations_task
1

1 Answers

0
votes

**kwargs in your call_loop_over_locations_function() is actually a context. You can use it to execute bash operator.

code

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime


DAG = DAG(
    dag_id='locations_test',
    start_date=datetime(2020, 1, 1),
    schedule_interval=None
)


def get_locations_function(**kwargs):

    locations = ['one','two','three']

    return locations


get_locations_task = PythonOperator(
    task_id='get_locations_task',
    python_callable=get_locations_function,
    provide_context=True,
    dag=DAG)


def call_loop_over_locations_function(**kwargs):
    print(kwargs)
    ti = kwargs.get('ti')
    locations = ti.xcom_pull(task_ids='get_locations_task')
    for loc in locations:
        print(loc) #this prints
        bash_operator = BashOperator(
            task_id='do_things_with_location',
            bash_command="echo '%s'" %loc,
            xcom_push=True,
            dag=DAG)
        bash_operator.execute(context=kwargs)

    #this doesn't get called, i have also tried 
    #call_loop_over_locations_task >> bash_operator

    bash_operator 


call_loop_over_locations_task = PythonOperator(
    task_id='call_loop_over_locations_task',
    python_callable=call_loop_over_locations_function,
    provide_context=True,
    dag=DAG)


get_locations_task >> call_loop_over_locations_task