5
votes

My pipeline : Kafka -> Dataflow streaming (Beam v2.3) -> BigQuery

Given that low-latency isn't important in my case, I use FILE_LOADS to reduce the costs, like this :

BigQueryIO.writeTableRows()
  .withJsonSchema(schema)
  .withWriteDisposition(WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
  .withMethod(Method.FILE_LOADS)
  .withTriggeringFrequency(triggeringFrequency)      
  .withCustomGcsTempLocation(gcsTempLocation)
  .withNumFileShards(numFileShards) 
  .withoutValidation()
  .to(new SerializableFunction[ValueInSingleWindow[TableRow], TableDestination]() {
    def apply(element: ValueInSingleWindow[TableRow]): TableDestination = {
      ...
    }
  }

This Dataflow step is introducing an always bigger delay in the pipeline, so that it can't keep up with Kafka throughput (less than 50k events/s), even with 40 n1-standard-s4 workers. As shown on the screenshot below, the system lag is very big (close to pipeline up-time) for this step, whereas Kafka system lag is only a few seconds.

System lag introduced by BigQueryIO.Write

If I understand correctly, Dataflow writes the elements into numFileShards in gcsTempLocation and every triggeringFrequency a load job is started to insert them into BigQuery. For instance if I choose a triggeringFrequency of 5 minutes, I can see (with bq ls -a -j) that all the load jobs need less than 1 minute to be completed. But still the step is introducing more and more delay, resulting in Kafka consuming less and less elements (thanks to bcackpressure). Increasing/decreasing numFileShards and triggeringFrequency doesn't correct the problem.

I don't manually specify any window, I just the default one. Files are not accumulating in gcsTempLocation.

Any idea what's going wrong here?

1
Bigquery Import is not designed for this use case. Are your import jobs taking more than 5 minutes? Often if you submit many at once, they can get stalled while waiting for the others to complete. Are you sure that you are triggered every 5 minutes?Lara Schmidt
Just to add to Lara's comment, remember that BigQuery is a multi-tenanted system. That means that if demand for resources is particularly high at any given time, it means your load jobs (or queries - a load job is actually just a federated query under the hood) will take a hit. I'd agree with Lara that using bulk loading for a streaming pipeline isn't a great idea. Streaming inserts really isn't very expensive anyway ;-)Graham Polley
@LaraSchmidt yes I'm sure that my load jobs are triggered every 5 minutes and I can also see with bq ls -a -j that they take less than 1 minute to complete. They are not submitted many at once, just one job every 5 minutes.benjben
Also, in the documentation of FILE_LOADS, it states This method can be chosen for unbounded inputs as well, as long as a triggering frequency is also set, what is the use case then?benjben
The real question that needs to be answered is why Dataflow system lag for BigQueryIO.Write step is very big whereas load jobs are triggered every 5 minutes and require less than 1 minute?benjben

1 Answers

1
votes

You mention that you don't explicitly specify a Window, which means that by default Dataflow will use the "Global window". The windowing documentation contains this warning:

Caution: Dataflow's default windowing behavior is to assign all elements of a PCollection to a single, global window, even for unbounded PCollections. Before you use a grouping transform such as GroupByKey on an unbounded PCollection, you must set a non-global windowing function. See Setting Your PCollection's Windowing Function.

If you don't set a non-global windowing function for your unbounded PCollection and subsequently use a grouping transform such as GroupByKey or Combine, your pipeline will generate an error upon construction and your Dataflow job will fail.

You can alternatively set a non-default Trigger for a PCollection to allow the global window to emit "early" results under some other conditions.

It appears that your pipeline doesn't do any explicit grouping, but I wonder if internal grouping via BigQuery write is causing issues.

Can you see in the UI if your downstream DropInputs has received any elements? If not, this is an indication that data is getting held up in the upstream BigQuery step.