0
votes

Dataflow - Is there windowed writes to BigQuery? I am trying to run a Dataflow job which reads 500Million rows of files, and then write to BigQuery. When i ran, it didn't go more than 15 Million, so looking if any kind of Windowing writes to BigQuery would help. While running, i got many GC Allocation failures, but I see that those are normal. I have left the default diskSize configured when running. Please help. If there is any examples for windowed writes to BigQuery, please provide.

As for the transformation, it is just a split of the the string, and then insert into BigQuery.

Also, is the example below keeps writing to BigQuery as it keeps streaming from PubSub? https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java

My sample below

Pipeline pipeline = Pipeline.create(options);
        PCollection<String> textData = pipeline.apply("Read Text Data",
                TextIO.read().from(options.getInputFilePattern()));
        PCollection<TableRow> tr = textData.apply(ParDo.of(new FormatRemindersFn()));

        tr.apply(BigQueryIO.writeTableRows().withoutValidation()              .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .withSchema(FormatRemindersFn.getSchema())
                //  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .to(options.getSchemaDetails()));

 static class FormatRemindersFn extends DoFn<String, TableRow> {
  @ProcessElement
        public void processElement(ProcessContext c) {
            try {
                if (StringUtils.isNotEmpty(c.element())) {
                    String[] fields = c.element().split("\\^",15);

                  //  logger.info("Fields :{}", fields[2]);
                    TableRow row = new TableRow().set("MODIFIED_DATE", fields[0])
                            .set("NAME", fields[1])
                            .set("ADDRESS", fields[2]);

                    c.output(row);
                }
            } catch (Exception e) {
                logger.error("Error: {}", e.getMessage());
            }
        }
}
1
500M should be easy for Dataflow. Can you confirm you're reading from GCS? How many files? Are you getting an error at 15M? - Graham Polley
The error got resolved, after commenting the logging done as part of DoFn for each element. - Roshan Fernando
Awesome. Can you provide your own answer for posterity. - Graham Polley

1 Answers

1
votes

The error got resolved, after commenting the logging done as part of DoFn for each element. Logging for every element should not be done when processing that many elements.