I have successfully created dynamic tasks in a DAG (Bash and Docker Operators) but I'm having a hard time passing those dynamically created tasks to xcom_pull to grab data.
for i in range(0, max_tasks):
task_scp_queue = BashOperator(task_id="scp_queue_task_{}".format(i), bash_command="""python foo""", retries=3, dag=dag, pool="scp_queue_pool", queue="foo", provide_context=True, xcom_push=True) # Pull the manifest ID from the previous task via xcom'
task_process_queue = DockerOperator(task_id="process_task_{}".format(i), command="""python foo --queue-name={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), retries=3, dag=dag, pool="process_pool", api_version="auto", image="foo", queue="foo", execution_timeout=timedelta(minutes=5))
task_manifest = DockerOperator(api_version="auto", task_id="manifest_task_{}".format(i), image="foo", retries=3, dag=dag, command=""" python --manifestid={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), pool="manfiest_pool", queue="d_parser")
task_psql_queue.set_downstream(task_scp_queue)
task_process_queue.set_upstream(task_scp_queue)
task_manifest.set_upstream(task_process_queue)
As you can see I tried just using Python format string in the Jinja template to pass the i variable in it, however that doesn't work.
I've also tried using "task.task_id", and creating a new string with just the task_id but that doesn't work either.
Edit:
Now command looks like this
command="""python foo \
--queue-name="{{
task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}"
""".format(i)
And my debug logs from Airflow look like
Using Master Queue: process_{
task_instance.xcom_pull(task_ids='scp_queue_task_31') }
So the string value is being populated but it's not executing the xcom_pull.