1
votes

I have been trying to leverage the Airflow macros reference default variables and BigQuery labels to log metadata for queries submitted through Airflow BigQueryOperator. Here is the operator definition:

BigQuery_Labels_Test_Task = BigQueryOperator(
    task_id="BigQuery_Labels_Test_Task",
    bql="SELECT 1",
    use_legacy_sql=False,
    bigquery_conn_id="gcp_bq_connection",
    destination_dataset_table=f"test_dataset.test_table",
    create_disposition="CREATE_IF_NEEDED",
    write_disposition="WRITE_TRUNCATE",
    labels={
        "dag_id": "{{ dag.dag_id }}",
        "task_id": "{{ task.task_id }}",
        "run_id": "{{ run_id }}",
    },
    dag=dag,
)

But when it executes it is throwing up the following error:

[2020-06-01 08:15:13,495] {{taskinstance.py:887}} INFO - Executing <Task(BigQueryOperator): BigQuery_Labels_Test_Task> on 2020-06-01T07:51:40.752935+00:00
[2020-06-01 08:15:13,499] {{standard_task_runner.py:53}} INFO - Started process 16317 to run task
[2020-06-01 08:15:13,567] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: BigQuery_Labels_Test_DAG.BigQuery_Labels_Test_Task 2020-06-01T07:51:40.752935+00:00 [running]> b438a71d2d52
[2020-06-01 08:15:13,592] {{bigquery_operator.py:255}} INFO - Executing: SELECT 1
[2020-06-01 08:15:14,161] {{taskinstance.py:1128}} ERROR - <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/test-project/jobs?alt=json returned "Label value "manual__2020-06-01T07:51:40.752935+00:00" has invalid characters.">
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 282, in execute
    encryption_configuration=self.encryption_configuration
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 910, in run_query
    return self.run_with_configuration(configuration)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1318, in run_with_configuration
    .execute(num_retries=self.num_retries)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/googleapiclient/http.py", line 907, in execute
    raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/test-project/jobs?alt=json returned "Label value "manual__2020-06-01T07:51:40.752935+00:00" has invalid characters.">
[2020-06-01 08:15:14,165] {{taskinstance.py:1151}} INFO - Marking task as UP_FOR_RETRY

Did anyone encounter anything like this? Does the label field in bq have any character restriction?

PS: When I hardcode something like the following for the label values, it works.:

labels={
    "dag_id": "dag_id",
    "task_id": "task_id",
},

Only lower case values works with hardcoding too.

1
Did the answer worked for you ? If yes, please accept it so that it is more visible and helpful to people facing similar issue. - Priya Agarwal

1 Answers

3
votes

This issue is not related to airflow but is to BigQuery. For defining labels,run_id doesn't fulfill below requirements(run_id has +,:):

Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes. All characters must use UTF-8 encoding, and international characters are allowed.

For more info on BigQuery labels, please refer to this.