I'm recently started working with Airflow. I'm working on DAG that:
- Queries the MySQL database
- Extract the query and stores it in a cloud storage bucket as a JSON file
- Uploads stored JSON file to BigQuery
Dag imports three operators: MySqlOperator
, MySqlToGoogleCloudStorageOperator
and GoogleCloudStorageToBigQueryOperator
I am using Airflow 1.8.0, Python 3, and Pandas 0.19.0.
Here is my Dag Code:
sql2gcp_csv = MySqlToGoogleCloudStorageOperator(
task_id='sql2gcp_csv',
sql='airflow_gcp/aws_sql_extract_7days.sql',
bucket='gs://{{var.value.gcs_bucket}}/{{ ds_nodash }}/',
filename='{{ ds_nodash }}-account-*.json',
schema_filename='support/file.json',
approx_max_file_size_bytes=1900000000,
mysql_conn_id='aws_mysql',
google_cloud_storage_conn_id='airflow_gcp',
)
However, when I run it I receive the following error:
[2017-07-20 22:38:07,478] {models.py:1441} INFO - Marking task as FAILED.
[2017-07-20 22:38:07,490] {models.py:1462} ERROR - a bytes-like object is required, not 'str'
/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to MySqlOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'database': 'test'}
category=PendingDeprecationWarning
/home/User/airflow/workspace/env/lib/python3.5/site-
packages/airflow/ti_deps/deps/base_ti_dep.py:94: PendingDeprecationWarning: generator '_get_dep_statuses' raised StopIteration
for dep_status in self._get_dep_statuses(ti, session, dep_context):
Traceback (most recent call last):
File "/home/User/airflow/workspace/env/bin/airflow", line 28, in <module> args.func(args)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/bin/cli.py", line 422, in run pool=args.pool,
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)
File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files
json.dump(row_dict, tmp_file_handle)
File "/usr/lib/python3.5/json/__init__.py", line 179, in dump
TypeError: a bytes-like object is required, not 'str'
Does anyone know why this exception is thrown?