1
votes

I am using Apache Beam 2.13.0 with GCP Dataflow runner.

I have a problem with streaming ingest to BigQuery from a batch pipeline:

PCollection<BigQueryInsertError> stageOneErrors =
  destinationTableSelected
    .apply("Write BQ Attempt 1",
      BigQueryIO.<KV<TableDestination, TableRow>>write()
                .withMethod(STREAMING_INSERTS)
                .to(new KVTableDestination())
                .withFormatFunction(new KVTableRow())
                .withExtendedErrorInfo()
                .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND))
                .getFailedInsertsWithErr();

The error:

 Shutting down JVM after 8 consecutive periods of measured GC thrashing. 
 Memory is used/total/max = 15914/18766/18766 MB, 
 GC last/max = 99.17/99.17 %, #pushbacks=0, gc thrashing=true. 
 Heap dump not written.

Same code working in the streaming mode correctly (if the with explicit method setting omitted).

The code works on reasonably small datasets (less than 2 million records). Fails on 2,5 million plus.

On the surface it appears to be a similar problem to the one described here: Shutting down JVM after 8 consecutive periods of measured GC thrashing

Creating a separate question to add additional details.

Is there anything I could do to fix this? Looks like the issue is within the BigQueryIO component itself - GroupBy key fails.

2

2 Answers

1
votes

The problem with transforms that contain GroupByKey is that it will wait until all the data for the current window has been received before grouping.

In Streaming mode, this is normally fine as the incoming elements are windowed into separate windows, so the GroupByKey only operates on a small(ish) chunk of data.

In Batch mode, however, the current window is the Global Window, meaning that GroupByKey will wait for the entire input dataset to be read and received before the grouping starts to be performed. If the input dataset is large, then your worker will run out of memory, which explains what you are seeing here.

This brings up the question: Why are you using BigQuery Streaming insert when processing Batch data? Streaming inserts are relatively expensive (compared to bulk which is free!) and have smaller quota/limits than Bulk import: even if you work around the issues you are seeing, there may be more issues yet to be discovered in Bigquery itself..

1
votes

After extensive discussions with the support and the developers it has been communicated that using BigQuery streaming ingress from a batch pipeline is discouraged and currently (as of 2.13.0) not supported.