I have a dag which i want to schedule for a future date based on certain inputs from external source.
{
"run_id":"run-eventId109bnfghjasdmajjsd1basdasdaaasdsdasdsk2",
"conf":{
"softwareId":"something"
},
"execution_date": "11-03T09:10:30" //could be any future date
}
Using this the first task is always stuck at the queued state and forever remains in that state even when the current time has reached the scheduled time mentioned in json Is it possible to achieve what i am trying to do using airflow. I would just like the task to be submitted for a future date and on that particular date-time the actual execution of the dag should start.
import random
import string
import requests
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
dag = DAG(
dag_id='xcomsDag',
description='A simple tutorial DAG',
schedule_interval=None,
start_date=datetime(2020, 10, 30, 0, 0)
)
start = DummyOperator(task_id='run_this_first', dag=dag)
# arguments=["print('{}')".format("{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
passing = KubernetesPodOperator(namespace='default',
image="python:3.6",
cmds=["python", "-c"],
arguments=[
'import time; import logging; logging.warn("Hello World"); logging.warn("xcomm value:{}"); time.sleep(10); logging.warn("Printed after 100 seconds.")'.format(
"{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
labels={"foo": "bar"},
name="passing-task",
task_id="passing-task",
get_logs=True,
dag=dag,
service_account_name="airflow-release"
)
def wait_for_approval(dag_run, **context):
software_id = dag_run.conf.get("softwareId", "Some Software Id")
print("polling status for softwareId:{}".format(software_id))
response = requests.get('https://5f8582eec29abd0016190be2.mockapi.io/api/v1/status')
print(response.json())
context["task_instance"].xcom_push(key="approverId", value="visardan-" + random.choice(string.ascii_letters))
return response.json()[0]['status']
wait_for_approval = PythonSensor(
task_id="wait_for_approval",
python_callable=wait_for_approval,
dag=dag,
poke_interval=30,
provide_context=True,
executor_config={"KubernetesExecutor": {"image": "apache/airflow:1.10.12-python3.6"}}
)
start >> wait_for_approval >> passing
