1
votes


I have a requirement to extract data from BigQuery table using Dataflow and write to GCS bucket.
Dataflow is built using apache beam (Java). The dataflow extracts from BigQuery and write to GCS perfectly for the first time.

But when a second dataflow is spinned up to extract data from same table after the first pipeline executes successfully, it is not extracting any data from Big Query. The only error i can see in the stackdriver log is

Blockquote "Request failed with code 409, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://www.googleapis.com/bigquery/v2/projects/dataflow-begining/jobs"

The sample code i have used for extraction is

 pipeline.apply("Extract from BQ", BigQueryIO.readTableRows().fromQuery("SELECT * from bq_test.employee"))

Any help is appreciated

1
Are you doing any special transformations or business logic on the way out in Dataflow? The reason I ask is because you don't have to use Dataflow. You could hit it with SQL and then just call BigQuery's export API to dump the table(s) straight to GCS. It would save you money by not using Dataflow either. Also - are you aware that using a SQL query in your pipeline to read from BigQuery will incur a charge? Don't use SELECT *. Instead, use from(TableReference) instead of fromQuery(SQL).Graham Polley

1 Answers

4
votes

I have seen this happen previously when using templates. As per the docs here, in Usage with templates section:

When using read() or readTableRows() in a template, it's required to specify BigQueryIO.Read.withTemplateCompatibility(). Specifying this in a non-template pipeline is not recommended because it has somewhat lower performance.

and in the withTemplateCompatibility section:

Use new template-compatible source implementation. This implementation is compatible with repeated template invocations.

If so, you should be using:

pipeline.apply("Extract from BQ", BigQueryIO
        .readTableRows()
        .withTemplateCompatibility()
        .fromQuery("SELECT * from bq_test.employee"))