0
votes

Environment:

  • RHEL 8.2 x86_64
  • Airflow 1.10.10
  • PostgreSQL 12.3
  • Python 3.6 Setup:
  • Airflow operates as user "svc_etl" which has permissions through group and user to Airflow home folder and DAG and log folders
  • DAG folder location on Windows Samba share (linked folder)
  • Task log folder location on Windows Samba share
  • Postgres and Airflow run as services (systemctl) on the same server (VM)
  • Airflow home folder on services server This setup is due to budgetary constraints.

Problem: Schedules on DAGs are not executed and end without log in failed state. In some cases using the option Clear in the Tree View of the DAG results in successful execution of the first task. Any further use of Clear leads to execution of subsequent tasks. These tasks can be either complete modules or individual functions. No errors appear in the Airflow scheduler log nor are there logs from the tasks themselves.

Steps undertaken:

  • Stop Airflow scheduler, delete all DAGs and their logs from folders and in database. Restart Airflow scheduler, copy DAGs back to folders and wait. DAGs appear back in GUI and database. Scheduled tasks will still show up as failed in GUI and in database. Clearing the error lead to successful execution of first task, then after further Clears the first and subsequent tasks succeed (and write task logs).
  • Manual executions can lead (depending on DAG definition) to same behaviour.
  • DAGs run successfully on a different Airflow test server which is sequential execution only. Log and DAG folders for both servers are at different locations and separated from each other.

Questions:

  • How to find out which component of Airflow / Postgres generates this behaviour?

Airflow Config:

[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /ABTEILUNG/DWH/airflow/dags

# The folder where airflow should store its log files
# This path must be absolute
# > writing logs to the GIT main folder is a very bad idea and needs to change soon!!!
base_log_folder = /ABTEILUNG/DWH/airflow/logs

# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = False
remote_log_conn_id =
remote_base_log_folder =
encrypt_s3_logs = False

# Logging level
logging_level = INFO
fab_logging_level = WARN

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
# Colour the logs when the controlling terminal is a TTY.
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter

log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Log filename format
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/airflow/logs/dag_processor_manager/dag_processor_manager.log

# Hostname by providing a path to a callable, which will resolve the hostname
# The format is "package:function". For example,
# default value "socket:getfqdn" means that result from getfqdn() of "socket" package will be used as hostname
# No argument should be required in the function specified.
# If using IP address as hostname is preferred, use value "airflow.utils.net:get_host_ip_address"
hostname_callable = socket:getfqdn

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = Europe/Berlin

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
#Default:
#executor = SequentialExecutor
executor = LocalExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/airflow/airflow.db
#! this needs to change to MySQL or Postgres database
sql_alchemy_conn = postgresql+psycopg2://localhost:5436/airflow_db?user=svc_etl&password=<some-hard-password>

# The encoding for the databases
sql_engine_encoding = utf-8

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 5

# The maximum overflow size of the pool.
# When the number of checked-out connections reaches the size set in pool_size,
# additional connections will be returned up to this limit.
# When those additional connections are returned to the pool, they are disconnected and discarded.
# It follows then that the total number of simultaneous connections the pool will allow is pool_size + max_overflow,
# and the total number of "sleeping" connections the pool will allow is pool_size.
# max_overflow can be set to -1 to indicate no overflow limit;
# no limit will be placed on the total number of concurrent connections. Defaults to 10.
sql_alchemy_max_overflow = 10

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,
# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 1800

# Check connection at the start of each connection pool checkout.
# Typically, this is a simple statement like “SELECT 1”.
# More information here: https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
sql_alchemy_pool_pre_ping = True

# The schema to use for the metadata database
# SqlAlchemy supports databases with the concept of multiple schemas.
sql_alchemy_schema =

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
# > actually this parameter is more like max_active_tasks per Airflow state database across all processing servers
# > Default original: parallelism = 32
parallelism = 6

# The number of task instances allowed to run concurrently by the schedule
# > again more like max_active_tasks_for_worker (process)
# > Default original: dag_concurrency = 16
dag_concurrency = 4

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# The maximum number of active DAG runs per DAG
# > Default original: max_active_runs_per_dag = 16
max_active_runs_per_dag = 4

# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment
load_examples = False

# Where your Airflow plugins are stored
plugins_folder = /home/airflow/plugins

# Secret key to save connection passwords in the db
fernet_key = 
#! this needs to be set to enable obfuscated password shown in GUI

# Whether to disable pickling dags
donot_pickle = False

# How long before timing out a python file import
dagbag_import_timeout = 30

# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 120

# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner

# If set, tasks without a `run_as_user` argument will be run with this user
# Can be used to de-elevate a sudo user running Airflow when executing tasks
default_impersonation =

# What security module to use (for example kerberos):
security =

# If set to False enables some unsecure features like Charts and Ad Hoc Queries.
# In 2.0 will default to True.
secure_mode = False

# Turn unit test mode on (overwrites many configuration options with test
# values at runtime)
unit_test_mode = False

# Name of handler to read task instance logs.
# Default to use task handler.
task_log_reader = task

# Whether to enable pickling for xcom (note that this is insecure and allows for
# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False).
enable_xcom_pickling = True

# When a task is killed forcefully, this is the amount of time in seconds that
# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED
killed_task_cleanup_time = 60

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params = False

# Worker initialisation check to validate Metadata Database connection
worker_precheck = False

# When discovering DAGs, ignore any files that don't contain the strings `DAG` and `airflow`.
dag_discovery_safe_mode = True

# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
# webserver
api_client = airflow.api.client.local_client

# If you set web_server_url_prefix, do NOT forget to append it here, ex:
# endpoint_url = http://localhost:8080/myroot
# So api will look like: http://localhost:8080/myroot/api/experimental/...
endpoint_url = http://localhost:8080

[api]
# How to authenticate users of the API
auth_backend = airflow.api.auth.backend.default

[lineage]
# what lineage backend to use
backend =

[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =

[operators]
# The default owner assigned to each new operator, unless
# provided explicitly or passed via `default_args`
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0

[hive]
...
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080

# The ip specified when starting the web server
web_server_host = 0.0.0.0

# The port on which to run the web server
web_server_port = 8080
...
# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
#Default:web_server_master_timeout = 120
web_server_master_timeout = 300

# Number of seconds the gunicorn webserver waits before timing out on a worker
#Default:web_server_worker_timeout = 120
web_server_worker_timeout = 300

# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
#Default:worker_refresh_interval = 30
worker_refresh_interval = 60

# Secret key used to run your flask app
secret_key = temporary_key

# Number of workers to run the Gunicorn web server
#Default:workers = 4
workers = 2

# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync

# Log files for the gunicorn webserver. '-' means log to stderr.
#Default:access_logfile = -
#Default:error_logfile = -
access_logile = -
error_logfile = /home/airflow/gunicorn.err
...
# Default DAG view.  Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree

# Default DAG orientation. Valid values are:
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR

# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False

# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
#Default:log_fetch_timeout_sec = 5
log_fetch_timeout_sec = 30

# By default, the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False

# Consistent page size across all listing views in the UI
page_size = 100
...
# Define the color of navigation bar
#Default:navbar_color = #007A87
navbar_color = #1C33C7

# Default dagrun to show in UI
default_dag_run_display_number = 25
...
# Default setting for wrap toggle on DAG code and TI log views.
default_wrap = False
...
[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = <valid-SMTP-server-connection>
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = 
# smtp_password = 
smtp_port = 25
smtp_mail_from = <valid-domain-address@domain>

[sentry]
...
[celery]
...
[dask]
...
[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
# Default original: scheduler_heartbeat_sec = 5
scheduler_heartbeat_sec = 20

# after how much time should the scheduler terminate in seconds
# -1 indicates to run continuously (see also num_runs)
run_duration = -1

# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1

# The number of seconds to wait between consecutive DAG file processing
# Default original: processor_poll_interval = 1
processor_poll_interval = 10

# after how much time (seconds) a new DAGs should be picked up from the filesystem
# Default original: min_file_process_interval = 0
min_file_process_interval = 10

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

# How often should stats be printed to the logs
print_stats_interval = 30

# If the last scheduler heartbeat happened more than scheduler_health_check_threshold ago (in seconds),
# scheduler is considered unhealthy.
# This is used by the health check in the "/health" endpoint
scheduler_health_check_threshold = 30

child_process_log_directory = /home/airflow/logs/scheduler
...
# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True

# This changes the batch size of queries in the scheduling main loop.
# If this is too high, SQL query performance may be impacted by one
# or more of the following:
#  - reversion to full table scan
#  - complexity of query predicate
#  - excessive locking
#
# Additionally, you may hit the maximum allowable query length for your db.
#
# Set this to 0 for no limit (not advised)
max_tis_per_query = 512

# Statsd (https://github.com/etsy/statsd) integration settings
...
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
max_threads = 2

authenticate = False

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True

[ldap]
...
[kerberos]
ccache = /tmp/airflow_krb5_ccache
# gets augmented with fqdn
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
...
[elasticsearch]
...
[kubernetes]
...

Addendum: If I have forgotten anything please comment and I will add this as soon as possible.

1
what do you see in the scheduler logs ? Do you see the regular heart beats ? also in the webserver logs ?hopeIsTheonlyWeapon
Regular heartbeats are there. There is no webserver log. Never got that running. Gunicorn shows that workers get refreshed as expected.Knut Boehnert

1 Answers

1
votes

After a few hours re-configuration of the involved linked directories we found a solution and some suspicions what the underlying problem might have been.

In this case the issue came very likely due to linking the directories via autofs instead of mount. The underlying issue very likely was:

  • autofs linked with the same user as mount so manual checks proved successful
  • the linked DAG folder was on a server in a different town in a different domain than the Airflow Scheduler server
  • however Airflow Scheduler operating as the user could not find the directory in the different domain via autofs (it worked via mount)

The solution used to get it working:

  • Move the DAG folder to the domain on to a linked directory that resides in the domain of the Airflow Scheduler server