1
votes

I'm using Python (2.7) and working within Google's DataFlow environment, needless to say, Google hasn't fully flushed everything out yet, and documentation is not quite sufficient just yet. However, the portion for writing from Dataflow to BigQuery is documented here BigQuery Sink.

According to the documentation, in order to specify the schema, you need to input a string:

schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'

The table name, project ID and dataset ID are like this: 'example_project_id:example_dataset_id.example_table_name'

Now, all of that is working. See the code below, but from what I can see, it is successfully creating the table and the fields. Note: The project ID is set as a part of the arguments for the function.

bq_data | beam.io.Write(
    "Write to BQ", beam.io.BigQuerySink(
        'example_dataset_id.{}'.format(bq_table_name),
        schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )
)

Now, it looks like I can get things inserting by using this:

bq_data = pipeline | beam.Create(
    [{
        'field_1': 'ExampleIdentifier',
        'field_2': 'ExampleValue',
        'field_3': 'ExampleFieldValue',
        'created_at': '2016-12-26T05:50:39Z',
        'updated_at': '2016-12-26T05:50:39Z',
        'field_4': 'ExampleDataIdentifier',
        'field_5: 'ExampleData'
    }]
)

But for some reason, when packing values into a PCollection, it says that it inserts into BigQuery, but when I query the table, it shows nothing.

Why isn't it inserting? I don't see any errors, yet nothing is inserting to BigQuery.

This is what the data looks like that is contained in the PCollection, I have close to 1,100 rows to insert:

{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}

Note: I checked into the date formatting, and the date formatting above is allowed for BigQuery insertion.

2
We are looking into this. Are you using DirectPipelineRunner (which is the default) ? Also can you give more details about your pipeline and how you verified that data is not available in BigQuery ? Make sure to call pipeline.run() at the end to execute your pipeline.chamikara
Here is a copy of the command used to run the dataflow with sample information put in: python -m dataflow_sample --runner DirectPipelineRunner --setup_file ./setup.py --job_name sample-dataflow-run-1 --server dev --worker_machine_type g1-small --num_workers 10 --start_date '2016-12-01' --end_date '2016-12-30' --devices device_id_1 device_id_2 device_id_3Jravict
As far as how I verified the data isn't in BigQuery, I am running a query against the very table where the information should be. And yes, I'm running pipeline.run() at the end of the dataflow run() function.Jravict
For good measure, here are the arguments available via command-line for this dataflow (Note: the arguments are created via argparse in Python): --server, required=False --region --output, required=False --project, required=False --bucket, required=False --job_name, required=False --staging_location, required=False --temp_location, required=False --runner, required=False --setup_file, required=False --devices, nargs="*", required=False --start_date, required=True --end_date, required=TrueJravict

2 Answers

0
votes

I tried an example with your exact schema and input and it worked for me. I had to do following fixes.

(1) Seems like you are not specifying a project in your arguments. You might be specifying this within your pipeline definition since you are not seeing an error for this. (2) There is a typo in the code you mentioned above. 'field_5: 'ExampleData' should be 'field_5': 'ExampleData' But I'm assuming this is just a typo in this question not in your original pipeline since you are not getting an error for this.

Are you running the latest version of Dataflow ? You can try creating a new virtual environment and run 'pip install google-cloud-dataflow' to install the latest version.

Is it possible to share your full pipleine for me to try out ?

It's hard to debug this remotely since you are using 'DirectPipelineRunner'. Is it possible to try running the same pipeline using 'DataflowPipelineRunner' (note that you'll need a GCP project with billing enabled for this) ? I will be able to view logs if you can run this using 'DataflowPipelineRunner' and provide a job id.

1
votes

This answer is pretty late, but maybe it'll help someone else. Your write statement in the pipeline is written incorrectly.

bq_data | 'Write to BigQuery' >> 
    beam.io.Write(beam.io.BigQuerySink(known_args.output_table, 
    schema=schema, 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) # This is overwrite whatever you have in your table