3
votes

I'm looking to use Dataflow to load data into BigQuery tables using BQ load jobs - not streaming (streaming would cost too much for our use case). I see that the Dataflow SDK has built in support for inserting data via BQ streaming, but I wasn't able to find anything in the Dataflow SDK that supports load jobs out of the box.

Some questions:

1) Does the Dataflow SDK have OOTB support for BigQuery load job inserts? If not, is it planned?

2) If I need to roll my own, what are some good approaches?

If I have to roll my own, performing a BQ load job using Google Cloud Storage is a multi step process - write the file to GCS, submit the load job via the BQ API, and (optionally) check the status until the job has completed (or failed). I'd hope I could use the existing TextIO.write() functionality to write to GCS, but I'm not sure how I'd compose that step with the subsequent call to the BQ API to submit the load job (and optionally the subsequent calls to check the status of the job until it's complete).

Also, I'd be using Dataflow in streaming mode, with windows of 60 seconds - so I'd want to do the load job every 60 seconds as well.

Suggestions?

2
(removed answer, and converted to comment). In batch mode, dataflow will actually write to GCS, and then kick off a BigQuery bulk load job(s) to get the data in. Then it should delete the files in GCS after the pipeline has succeeded (or failed), but there is bug with that (goo.gl/8rY1uk). In streaming mode, it will indeed use the streaming API. What kind of size are we talking about here? Streaming costs only $0.01 per 200MB. Maybe you could write 2 pipelines - one which writes to GCS (in streaming mode), and another in batch mode with scoops up those files and uses BQ's bulk loading? - Graham Polley
Interesting, didn't realize DF is writing to BQ differently based on if the pipeline is batch or streaming. I still can't find the batch BQ load code in the DF SDK, but maybe it's not distributed with the SDK? Regarding streaming costs, price is a bit high for our use case - about 6 billion 1KB rows/day, which comes out to around $9000/mo for BQ streaming inserts (it's the 1KB min row size that kills us as most of our rows are actually half that size). - Jon Chase
What about the idea of having 2 pipelines? One to write to GCS (streaming), and another (in batch) scooping that data up and batch loading to BQ? Could that work for you? Maybe one of the Google engineers will jump on here, and give some more (probably better :)) suggestions. - Graham Polley
Thanks for the suggestion - streaming to GCS and batching loads to BQ is what I'm thinking right now as well, though I don't like the operational overhead that adds. Guess I'll dig into the code a bit more to see if I can somehow make DF do the batch loads to BQ even in streaming mode. - Jon Chase
I happened to see this thread today -- this looks like a great feature request, which I will make internally. There's an obvious tension between latency and cost her -- can you give us some sense of what type of latency would be acceptable? E.g., streaming to GCS and then running a batch import job every 24h would do it... - Dan Halperin

2 Answers

2
votes

I'm not sure which version of Apache Beam you are using, but now it's possible to use a micro-batching tactic using a Stream Pipeline. If you decide one way or another you can use something like this:

.apply("Saving in batches", BigQueryIO.writeTableRows()
                    .to(destinationTable(options))
                    .withMethod(Method.FILE_LOADS)
                    .withJsonSchema(myTableSchema)
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withTriggeringFrequency(Duration.standardMinutes(2))
                    .withNumFileShards(1);
                    .optimizedWrites());

Things to keep in mind

  1. There are 2 different methods: FILE_LOADS and STREAMING_INSERT, if you use the first one you need to include the withTriggeringFrequency and withNumFileShards. For the first one, from my experience, is better to use minutes and the number will depend on the amount of throughput data. If you receive quite a lot try to keep it small, I have seen "stuck errors" when you increase it too much. The shards can affect mostly your GCS billing, if you add to much shards it will create more files per table per x amount of minutes.
  2. If your input data size is not so big the streaming insert can work really well and the cost shouldn't be a big deal. In that scenario you can use STREAMING_INSERT method and remove the withTriggeringFrequency and withNumFileShards. Also, you can add withFailedInsertRetryPolicy like InsertRetryPolicy.retryTransientErrors() so no rows are being lost (keep in mind that idempotency is not guaranteed with STREAM_INSERTS, so duplication is possible)
  3. You can check your Jobs in BigQuery and validate that everything is working! Keep in mind the policies for jobs with BigQuery (I think is 1000 jobs per table) when you are trying to define triggering frequency and shards.

Note: You can always read this article about efficient aggregation pipelines https://cloud.google.com/blog/products/data-analytics/how-to-efficiently-process-both-real-time-and-aggregate-data-with-dataflow

0
votes

BigQueryIO.write() always uses BigQuery load jobs when the input PCollection is bounded. If you'd like it to also use them if it is unbounded, specify .withMethod(FILE_LOADS).withTriggeringFrequency(...).