0
votes

I have a problem with Airflow, The first job in a DAG always starts and ends successfully but the second job never starts automatically.

I try to clear the job in the UI but it doesn't starts, if I want to see it running I need to delete the running jobs in the database,

delete from job where state='running'

But I haven't a lot of jobs in running state, I have only 1 job SchedulerJob with the Latest Heartbeat ok, and 16 external task sensors waiting for this DAG

The pool have 150 slots and there are 16 running and 1 scheduled.

  • I have the airflow scheduler running
  • I have the airflow webserver running
  • All DAGs are set to On in the web ui
  • All the DAGs have a start date which is in the past
  • I reset the scheduler hours before

And this is the code in airflow

default_args = {
  'owner': 'airgia',
  'depends_on_past': False,
  'retries': 2,
  'start_date': datetime(2018, 12, 1, 0, 0),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False
}

dag = DAG('trigger_snapshot',
      default_args=default_args,
      dagrun_timeout= timedelta(hours=22),
      schedule_interval="0 0 * * 1,2,3,4,5,7",
      max_active_runs=1,
      catchup=False
   )

set_exec_dt = PythonOperator(
    task_id='set_exec_dt',
    python_callable=set_exec_dt_variable,
    dag=dag,
    pool='capser')

lanza_crawler = PythonOperator(
    task_id='lanza_crawler',
    op_kwargs={"crawler_name": crawler_name},
    python_callable=start_crawler,
    dag=dag,
    pool='capser')

copy_as_processed =  PythonOperator(
    task_id='copy_as_processed',
    op_kwargs={"source_bucket": Variable.get("bucket"),
           "source_key": snapshot_key,
           "dest_bucket": Variable.get("bucket"),
           "dest_key": "{0}_processed".format(snapshot_key)},
    python_callable=s3move,
    dag=dag,
    pool='capser')

airflow_snapshot = S3KeySensor(
    task_id='airflow_snapshot',
    bucket_key=snapshot_key,
    wildcard_match=True,
    bucket_name=Variable.get("bucket"),
    timeout=8*60*60,
    poke_interval=120,
    dag=dag,
    pool='capser')


Fin_DAG_TC = DummyOperator(
    task_id='Fin_DAG_TC',
    dag=dag,
pool='capser')


airflow_snapshot >> lanza_crawler >> set_exec_dt >> copy_as_processed >> Fin_DAG_TC

And this is what I see when I connect to web ui every morning

operator null


[EDIT]

This is the last log for scheduler

Here we can see the call for second job (lanza_crawler) but not the start.

[2018-12-11 03:50:54,209] {{jobs.py:1109}} INFO - Tasks up for execution:

[2018-12-11 03:50:54,240] {{jobs.py:1180}} INFO - DAG trigger_snapshot has 0/16 running and queued tasks

[2018-12-11 03:50:54,240] {{jobs.py:1218}} INFO - Setting the follow tasks to queued state:

[2018-12-11 03:50:54,254] {{jobs.py:1301}} INFO - Setting the follow tasks to queued state:

[2018-12-11 03:50:54,255] {{jobs.py:1343}} INFO - Sending ('trigger_snapshot', 'lanza_crawler', datetime.datetime(2018, 12, 10, 0, 0, tzinfo=), 1) to executor with priority 4 and queue default

[2018-12-11 03:50:54,255] {{base_executor.py:56}} INFO - Adding to queue: airflow run trigger_snapshot lanza_crawler 2018-12-10T00:00:00+00:00 --local --pool capser -sd /usr/local/airflow/dags/capser/trigger_snapshot.py

[2018-12-11 03:50:54,262] {{celery_executor.py:83}} INFO - [celery] queuing ('trigger_snapshot', 'lanza_crawler', datetime.datetime(2018, 12, 10, 0, 0, tzinfo=), 1) through celery, queue=default

[2018-12-11 03:50:54,749] {{jobs.py:1447}} INFO - Executor reports trigger_snapshot.airflow_snapshot execution_date=2018-12-10 00:00:00+00:00 as success for try_number 1

/usr/local/airflow/dags/capser/trigger_snapshot.py 1.53s 2018-12-11T03:50:54

...

/usr/local/airflow/dags/capser/trigger_snapshot.py 6866 0.68s 1.54s 2018-12-11T03:56:50

and This is the last log for worker

[2018-12-11 03:50:52,718: INFO/ForkPoolWorker-11] Task airflow.executors.celery_executor.execute_command[9a2e1ae7-9264-47d8-85ff-cac32a542708] succeeded in 13847.525094523095s: None

[2018-12-11 03:50:54,505: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[9ff70fc8-45ef-4751-b274-71e242553128]

[2018-12-11 03:50:54,983] {{settings.py:174}} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800

[2018-12-11 03:50:55,422] {{_ _init__.py:51}} INFO - Using executor CeleryExecutor

[2018-12-11 03:50:55,611] {{models.py:271}} INFO - Filling up the DagBag from /usr/local/airflow/dags/capser/DAG_AURORA/DAG_AURORA.py

[2018-12-11 03:50:55,970] {{cli.py:484}} INFO - Running on host ip----*.eu-west-1.compute.internal

1
Have you checked your scheduler logs? Are you using Celery?joebeeson
Yes I've checked the scheduler logs and I can't see nothing, and yes, i`m use Celery over a redis.cimujo
When you say you see nothing in your Celery logs, do you mean you don't see the task message?joebeeson
I edited the post and added the log what I seecimujo
based on your schedule_interval, 0 0 * * 1,2,3,4,5,7 it will run "At 00:00 on Monday, Tuesday, Wednesday, Thursday, Friday, and Sunday.", is this your expected time ?SMDC

1 Answers

0
votes

In the aws graphics we saw at 80% of worker's memory occupied and we decided increase the number of workers, and the problem was solved.