I am attempting to return a list of values from an airflow task, and then within another task loop over the list and call a BashOperator using the value as an argument to a python script.
If using a PythonOperator is the correct way then I would be intersted in seeing how to do that, but I do want to have the Python script I am calling be an external file and not another callable in my Airflow Dag.
I am able to loop over the list and print out the value, however I can't figure out how to call the BashOperator which I have placed in the loop.
Here is a simple version of the code, just attempting to echo the value:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
DAG = DAG(
dag_id='locations_test',
start_date=datetime.now(),
schedule_interval='@once'
)
def get_locations_function(**kwargs):
locations = ['one','two','three']
return locations
get_locations_task = PythonOperator(
task_id='get_locations_task',
python_callable=get_locations_function,
provide_context=True,
dag=DAG)
def call_loop_over_locations_function(**kwargs):
ti = kwargs['ti']
locations = ti.xcom_pull(task_ids='get_locations_task')
for loc in locations:
print(loc) #this prints
bash_operator = BashOperator(
task_id='do_things_with_location',
bash_command="echo '%s'" %loc,
xcom_push=True,
dag=DAG)
#this doesn't get called, i have also tried
#call_loop_over_locations_task >> bash_operator
bash_operator
call_loop_over_locations_task = PythonOperator(
task_id='call_loop_over_locations_task',
python_callable=call_loop_over_locations_function,
provide_context=True,
dag=DAG)
get_locations_task >> call_loop_over_locations_task