I have created a DAG that extracts MySQL data from a database and loads it to Cloud storage then BigQuery as json files.
The DAG works for certain tables but not all, because it can't decode certain characters in the tables. It's quite a lot of data so I can't pin point exactly where the error or invalid characters are.
I've tried changing my database, table and column character sets from utf8 to utf8mb4. This didn't help.
I've also tried calling encoding='utf-8' as well as 'iso-8859-1', how ever I don't think I am calling them correctly because I have been doing this with my connection and I still get the same error.
I'm running Python 2.7.12 and airflow v1.8.0
Update: After reading this: https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls which suggests using a connection string that defines charset, ex.:sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8
How would this be done with a Cloud SQL instance?
podio_connections = [
'mysql_connection'
]
podio_tables = [
'finance_banking_details',
'finance_goods_invoices',
]
default_args = {
'owner': 'xxxxxx',
'start_date': datetime(2018,1,11),
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')
slack_notify = SlackAPIPostOperator(
task_id='slack_notify',
token='xxxxxx',
channel='data-status',
username='airflow',
text='Successfully performed Podio ETL operation',
dag=dag)
for connection in podio_connections:
for table in podio_tables:
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_mysql_%s_%s"%(connection,table),
mysql_conn_id=connection,
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
bucket='podio-reader-storage',
filename="%s/%s/%s{}.json"%(connection,table,table),
schema_filename="%s/schemas/%s.json"%(connection,table),
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bg_%s_%s"%(connection,table),
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='podio-reader-storage',
#destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
source_objects=["%s/%s/%s*.json"%(connection,table,table)],
schema_object="%s/schemas/%s.json"%(connection,table),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag)
load.set_upstream(extract)
slack_notify.set_upstream(load)
[2018-01-12 15:36:10,221] {models.py:1417} ERROR - 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files json.dump(row_dict, tmp_file_handle)
File "/usr/lib/python2.7/json/init.py", line 189, in dump for chunk in iterable:
File "/usr/lib/python2.7/json/encoder.py", line 434, in _iterencode for chunk in _iterencode_dict(o, _current_indent_level):
File "/usr/lib/python2.7/json/encoder.py", line 390, in _iterencode_dict yield _encoder(value)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte