0
votes

I have a daily csv file coming into my bucket on google storage and I built a function that load this csv and append it into a table in BigQuery when it comes in. However, I want to add a new column to the csv with the function execution id (context["id"]) before I load the data to Big query.

is that possible?

thanks in advance!

def TimeTableToBigQuery(data, context):
    # Getting metadata about the uploaded file, the storage and datetime of insert
    excution_id = context['event_id']
    bucketname = data['bucket']
    filename = data['name']
    timeCreated = data['timeCreated']
    pathtofile = data["id"].rsplit("/", 2)
    # parent_folder = data["id"].rsplit("/", 3)
    file = str(pathtofile[1])
    name = file.split('---')
    dates = name[0].split('_', 1)
    arrivedat = str(dates[1])
    path = pathtofile[0]
    # parent_folder = parent_folder[1]

    # work start here to get the data into the table we establish a job before we send this job to load :)
    client = bigquery.Client()
    dataset_id = 'nature_bi'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.skip_leading_rows = 1
    job_config.field_delimiter = ';',
    job_config.allow_jagged_rows = True
    job_config.allow_quoted_newlines = True
    job_config.write_disposition = 'WRITE_TRUNCATE',
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.schema = [
        bigquery.SchemaField('Anstallningsnummer', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Kod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Kostnadsstalle', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Tidkod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('OB_tidkod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Dagsschema', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Schemalagd_arbetstid', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Summa_narvaro', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Summa_franvaro', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum_for_klarmarkering', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum_for_attestering', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Frislappsdatum', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Export_klockslag', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Vecka', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('RowHashKey', 'STRING', mode='NULLABLE', description='to be written in BQ'),
        bigquery.SchemaField('MergeState', 'INTEGER', mode='NULLABLE', description='for merging data in BQ'),
        bigquery.SchemaField('SourceName', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('SourceScript', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('ArriveDateTime', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('InsertDateTime', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('ExecutionID', 'STRING', mode='NULLABLE', description='Path to file')
    ]

    uri = 'gs://%s/%s' % (bucketname, filename)
    print('Received file "%s" at %s.' % (
        uri,
        timeCreated
    ))
    tablename = 'employee_time'
    table_id = dataset_ref.table(tablename)  # table_id = "its value was in load_job="
    # get the URI for uploaded CSV in GCS from 'data'
    uri = 'gs://' + data['bucket'] + '/' + data['name']

    # lets do this and send our job that we configured before to load to BQ
    load_job = client.load_table_from_uri(
        uri,
        table_id,
        job_config=job_config)
    # Here we print some information in the log to track our work
    print('Starting job with ID {}'.format(load_job.job_id))
    print('File: {}'.format(data['name']))
    load_job.result()  # wait for table load to complete.
    print('Job finished.')
    destination_table = client.get_table(dataset_ref.table(tablename))
    print('Loaded {} rows.'.format(destination_table.num_rows))

1

1 Answers

3
votes

You have 3 ways to achieve this

  • Rewrite the file
    • Read line by line your file
    • on each line, add the fields that you want
    • write in local file (/tmp directory is available, it's in memory and limited to the size of your function memory).
    • then load this file in your table
  • If you want to keep the data (WRITE_APPEND)
    • Load the file as is in a temporary table
    • wait the end of the load job
    • run a query like this insert into <your table> select *,CURRENT_TIMESTAMP() AS InsertDateTim,<your executionId> AS ExecutionId FROM <temp table>.
    • Then delete the temp table (or create it in a dataset with a Delete table expiry of 1 day). However, take care, the function can live up to 9 minutes, if your file are large, it could take time to perform all this in one function. You can build something more complex (I could detail if you want). Moreover, you query all the temp data to sink then into the final table, and that can have cost if you have lot of data.
  • if you perform a WRITE_TRUNCATE (as in your code example), you can perform a smarter things.

    • Delete the previous existing table
    • Load the file in a table with a table name like this nature_bi_<insertDate>_<executionId>
    • When you query, inject the table name into your query result (here I simple add the table name, but with UDF or native BigQuery function, you can extract date and executionId)
SELECT *,(SELECT table_id
      FROM `<project>.<dataset>.__TABLES_SUMMARY__`
      WHERE table_id LIKE 'nature_bi%') FROM `<project>.<dataset>.nature_bi*` LIMIT 1000

All solutions are valid, depends of your contrains and file size