2
votes

I am trying to read about 90 gzipped JSON logfiles from Google Cloud Storage (GCS), each about 2GB large (10 GB uncompressed), parse them, and write them into a date-partitioned table to BigQuery (BQ) via Google Cloud Dataflow (GCDF).

Each file holds 7 days of data, the whole date range is about 2 years (730 days and counting). My current pipeline looks like this:

p.apply("Read logfile", TextIO.Read.from(bucket))
 .apply("Repartition", Repartition.of())
 .apply("Parse JSON", ParDo.of(new JacksonDeserializer()))
 .apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps()))
 .apply("Format output to TableRow", ParDo.of(new TableRowConverter()))
 .apply("Window into partitions", Window.into(new TablePartWindowFun()))
 .apply("Write to BigQuery", BigQueryIO.Write
         .to(new DayPartitionFunc("someproject:somedataset", tableName))
         .withSchema(TableRowConverter.getSchema())
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

The Repartition is something I've built in while trying to make the pipeline reshuffle after decompressing, I have tried running the pipeline with and without it. Parsing JSON works via a Jackon ObjectMapper and corresponding classes as suggested here. The TablePartWindowFun is taken from here, it is used to assign a partition to each entry in the PCollection.

The pipeline works for smaller files and not too many, but breaks for my real data set. I've selected large enough machine types and tried setting a maximum number of workers, as well as using autoscaling up to 100 of n1-highmem-16 machines. I've tried streaming and batch mode and disSizeGb values from 250 up to 1200 GB per worker.

The possible solutions I can think of at the moment are:

  1. Uncompress all files on GCS, and so enabling the dynamic work splitting between workers, as it is not possible to leverage GCS's gzip transcoding
  2. Building "many" parallel pipelines in a loop, with each pipeline processsing only a subset of the 90 files.

Option 2 seems to me like programming "around" a framework, is there another solution?

Addendum:

With Repartition after Reading the gzip JSON files in batch mode with 100 workers max (of type n1-highmem-4), the pipeline runs for about an hour with 12 workers and finishes the Reading as well as the first stage of Repartition. Then it scales up to 100 workers and processes the repartitioned PCollection. After it is done the graph looks like this:

Write to BQ Service Graph

Interestingly, when reaching this stage, first it's processing up to 1.5 million element/s, then the progress goes down to 0. The size of OutputCollection of the GroupByKey step in the picture first goes up and then down from about 300 million to 0 (there are about 1.8 billion elements in total). Like it is discarding something. Also, the ExpandIterable and ParDo(Streaming Write) run-time in the end is 0. The picture shows it slightly before running "backwards". In the logs of the workers I see some exception thrown while executing request messages that are coming from the com.google.api.client.http.HttpTransport logger, but I can't find more info in Stackdriver.

Without Repartition after Reading the pipeline fails using n1-highmem-2 instances with out of memory errors at exactly the same step (everything after GroupByKey) - using bigger instance types leads to exceptions like

java.util.concurrent.ExecutionException: java.io.IOException: 
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s 
talking to frontendpipeline-..-harness-pc98:12346
2
"but breaks for my real data set" - what exactly happens? What error(s) do you get?Graham Polley
I've added an example of the error.Tobi
I've added another example without the Repartition stepTobi
How does the pipeline behave when writing directly to a table, rather than using to(new DayPartitionFunc("someproject:somedataset", tableName))? In Batch pipelines, the use of window-based partitioning is currently experimental (the Javadoc says unsupported in Batch). It may not scale as well as writing directly to a specific table.Ben Chambers
If I target a specific table it works, but then I end up with a huge table (1.9 TB) which is very costly to subdivide into partitions via queries, considering I have more than 730 days.Tobi

2 Answers

1
votes

Thanks to Dan from the Google Cloud Dataflow Team and the example he provided here, I was able to solve the issue. The only changes I made:

  • Looping over the days in 175 = (25 weeks) large chunks, running one pipeline after the other, to not overwhelm the system. In the loop make sure the last files of the previous iteration are re-processed and the startDate is moved forward at the same speed as the underlying data (175 days). As WriteDisposition.WRITE_TRUNCATE is used, incomplete days at the end of the chunks are overwritten with correct complete data this way.

  • Using the Repartition/Reshuffle transform mentioned above, after reading the gzipped files, to speed up the process and allow smoother autoscaling

  • Using DateTime instead of Instant types, as my data is not in UTC

UPDATE (Apache Beam 2.0):

With the release of Apache Beam 2.0 the solution became much easier. Sharding BigQuery output tables is now supported out of the box.

0
votes

It may be worthwhile trying to allocate more resources to your pipeline by setting --numWorkers with a higher value when you run your pipeline. This is one of the possible solutions discussed in the “Troubleshooting Your Pipeline” online document, at the "Common Errors and Courses of Action" sub-chapter.