41
votes

I'm running 5 DAG's which have generated a total of about 6GB of log data in the base_log_folder over a months period. I just added a remote_base_log_folder but it seems it does not exclude logging to the base_log_folder.

Is there anyway to automatically remove old log files, rotate them or force airflow to not log on disk (base_log_folder) only in remote storage?

8
I would be curious 2 years later what your solution was... experiencing this issueGlenn Sampson

8 Answers

21
votes

Please refer https://github.com/teamclairvoyant/airflow-maintenance-dags

This plugin has DAGs that can kill halted tasks and log-cleanups. You can grab the concepts and can come up with a new DAG that can cleanup as per your requirement.

14
votes

We remove the Task logs by implementing our own FileTaskHandler, and then pointing to it in the airflow.cfg. So, we overwrite the default LogHandler to keep only N task logs, without scheduling additional DAGs.

We are using Airflow==1.10.1.

[core]
logging_config_class = log_config.LOGGING_CONFIG

log_config.LOGGING_CONFIG

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}'
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'

LOGGING_CONFIG = {
    'formatters': {},
    'handlers': {
        '...': {},
        'task': {
            'class': 'file_task_handler.FileTaskRotationHandler',
            'formatter': 'airflow.job',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
            'folder_task_template': FOLDER_TASK_TEMPLATE,
            'retention': 20
        },
        '...': {}
    },
    'loggers': {
        'airflow.task': {
            'handlers': ['task'],
            'level': JOB_LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        '...': {}
    }
}

file_task_handler.FileTaskRotationHandler

import os
import shutil

from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler


class FileTaskRotationHandler(FileTaskHandler):

    def __init__(self, base_log_folder, filename_template, folder_task_template, retention):
        """
        :param base_log_folder: Base log folder to place logs.
        :param filename_template: template filename string.
        :param folder_task_template: template folder task path.
        :param retention: Number of folder logs to keep
        """
        super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template)
        self.retention = retention
        self.folder_task_template, self.folder_task_template_jinja_template = \
            parse_template_string(folder_task_template)

    @staticmethod
    def _get_directories(path='.'):
        return next(os.walk(path))[1]

    def _render_folder_task_path(self, ti):
        if self.folder_task_template_jinja_template:
            jinja_context = ti.get_template_context()
            return self.folder_task_template_jinja_template.render(**jinja_context)

        return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id)

    def _init_file(self, ti):
        relative_path = self._render_folder_task_path(ti)
        folder_task_path = os.path.join(self.local_base, relative_path)
        subfolders = self._get_directories(folder_task_path)
        to_remove = set(subfolders) - set(subfolders[-self.retention:])

        for dir_to_remove in to_remove:
            full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove)
            print('Removing', full_dir_to_remove)
            shutil.rmtree(full_dir_to_remove)

        return FileTaskHandler._init_file(self, ti)
3
votes

Airflow maintainers don't think truncating logs is a part of airflow core logic, to see this, and then in this issue, maintainers suggest to change LOG_LEVEL avoid too many log data.

And in this PR, we can learn how to change log level in airflow.cfg.

good luck.

2
votes

I know it sounds savage, but have you tried pointing base_log_folder to /dev/null? I use Airflow as a part of a container, so I don't care about the files either, as long as the logger pipe to STDOUT as well.

Not sure how well this plays with S3 though.

0
votes

For your concrete problems, I have some suggestions. For those, you would always need a specialized logging config as described in this answer: https://stackoverflow.com/a/54195537/2668430

  • automatically remove old log files and rotate them

I don't have any practical experience with the TimedRotatingFileHandler from the Python standard library yet, but you might give it a try: https://docs.python.org/3/library/logging.handlers.html#timedrotatingfilehandler

It not only offers to rotate your files based on a time interval, but if you specify the backupCount parameter, it even deletes your old log files:

If backupCount is nonzero, at most backupCount files will be kept, and if more would be created when rollover occurs, the oldest one is deleted. The deletion logic uses the interval to determine which files to delete, so changing the interval may leave old files lying around.

Which sounds pretty much like the best solution for your first problem.


  • force airflow to not log on disk (base_log_folder), but only in remote storage?

In this case you should specify the logging config in such a way that you do not have any logging handlers that write to a file, i.e. remove all FileHandlers.

Rather, try to find logging handlers that send the output directly to a remote address. E.g. CMRESHandler which logs directly to ElasticSearch but needs some extra fields in the log calls. Alternatively, write your own handler class and let it inherit from the Python standard library's HTTPHandler.


A final suggestion would be to combine both the TimedRotatingFileHandler and setup ElasticSearch together with FileBeat, so you would be able to store your logs inside ElasticSearch (i.e. remote), but you wouldn't store a huge amount of logs on your Airflow disk since they will be removed by the backupCount retention policy of your TimedRotatingFileHandler.

0
votes

Usually apache airflow grab the disk space due to 3 reasons

  • 1. airflow scheduler logs files 2. mysql binaly logs [Major] 3. xcom table records.

To make it clean up on regular basis I have set up a dag which run on daily basis and cleans the binary logs and truncate the xcom table to make the disk space free You also might need to install [pip install mysql-connector-python]. To clean up scheduler log files I do delete them manually two times in a week to avoid the risk of logs deleted which needs to be required for some reasons.

I clean the logs files by [sudo rm -rd airflow/logs/] command.

Below is my python code for reference

'
"""Example DAG demonstrating the usage of the PythonOperator."""

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator


args = {
    'owner': 'airflow',
    'email_on_failure':True,
    'retries': 1,
    'email':['Your Email Id'],
    'retry_delay': timedelta(minutes=5)
}


dag = DAG(
    dag_id='airflow_logs_cleanup',
    default_args=args,
    schedule_interval='@daily',
    start_date=days_ago(0),
    catchup=False,
    max_active_runs=1,
    tags=['airflow_maintenance'],
)

def truncate_table():
    import mysql.connector

    connection = mysql.connector.connect(host='localhost',
                                         database='db_name',
                                         user='username',
                                         password='your password',
                                         auth_plugin='mysql_native_password')
    cursor = connection.cursor()
    sql_select_query = """TRUNCATE TABLE xcom"""
    cursor.execute(sql_select_query)
    connection.commit()
    connection.close()
    print("XCOM Table truncated successfully")


def delete_binary_logs():
    import mysql.connector
    from datetime import datetime
    date = datetime.today().strftime('%Y-%m-%d')
    connection = mysql.connector.connect(host='localhost',
                                         database='db_name',
                                         user='username',
                                         password='your_password',
                                         auth_plugin='mysql_native_password')
    cursor = connection.cursor()
    query = 'PURGE BINARY LOGS BEFORE ' + "'" + str(date) + "'"

    sql_select_query = query
    cursor.execute(sql_select_query)
    connection.commit()
    connection.close()
    print("Binary logs deleted  successfully")

t1 = PythonOperator(

    task_id='truncate_table',
    python_callable=truncate_table, dag=dag

)

t2 = PythonOperator(

    task_id='delete_binary_logs',
    python_callable=delete_binary_logs, dag=dag
)
t2 << t1

'

0
votes

I am surprized but it worked for me. Update your config as below:

base_log_folder=""

It is test in minio and in s3.

-1
votes

I don't think that there is a rotation mechanism but you can store them in S3 or google cloud storage as describe here : https://airflow.incubator.apache.org/configuration.html#logs