3
votes

I have a dag which i want to schedule for a future date based on certain inputs from external source.

    {
        "run_id":"run-eventId109bnfghjasdmajjsd1basdasdaaasdsdasdsk2",
        "conf":{
            "softwareId":"something"
        },
        "execution_date": "11-03T09:10:30" //could be any future date
    }

Using this the first task is always stuck at the queued state and forever remains in that state even when the current time has reached the scheduled time mentioned in json Is it possible to achieve what i am trying to do using airflow. I would just like the task to be submitted for a future date and on that particular date-time the actual execution of the dag should start.

import random
import string

import requests
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG(
    dag_id='xcomsDag',
    description='A simple tutorial DAG',
    schedule_interval=None,
    start_date=datetime(2020, 10, 30, 0, 0)
)

start = DummyOperator(task_id='run_this_first', dag=dag)
# arguments=["print('{}')".format("{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
passing = KubernetesPodOperator(namespace='default',
                                image="python:3.6",
                                cmds=["python", "-c"],
                                arguments=[
                                    'import time; import logging; logging.warn("Hello World"); logging.warn("xcomm value:{}"); time.sleep(10); logging.warn("Printed after 100 seconds.")'.format(
                                        "{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
                                labels={"foo": "bar"},
                                name="passing-task",
                                task_id="passing-task",
                                get_logs=True,
                                dag=dag,
                                service_account_name="airflow-release"
                                )


def wait_for_approval(dag_run, **context):
    software_id = dag_run.conf.get("softwareId", "Some Software Id")
    print("polling status for softwareId:{}".format(software_id))
    response = requests.get('https://5f8582eec29abd0016190be2.mockapi.io/api/v1/status')
    print(response.json())
    context["task_instance"].xcom_push(key="approverId", value="visardan-" + random.choice(string.ascii_letters))
    return response.json()[0]['status']


wait_for_approval = PythonSensor(
    task_id="wait_for_approval",
    python_callable=wait_for_approval,
    dag=dag,
    poke_interval=30,
    provide_context=True,
    executor_config={"KubernetesExecutor": {"image": "apache/airflow:1.10.12-python3.6"}}
)

start >> wait_for_approval >> passing

enter image description here

1

1 Answers

1
votes

You can create a DAG with a start date which equals to the requested run time and use @once schedule interval.

For example (schedule for a future date on year 2030):

dag = DAG(
    dag_id='xcomsDag',
    description='A simple tutorial DAG',
    schedule_interval='@once',
    start_date=datetime(2030, 1, 1, 0, 0)
)

Note that if you need more than one future time to schedule, you can't use this solution unless you create a DAG for each run.