2
votes

I am trying to write a pipeline which periodically checks a Google Storage bucket for new .gz files which are actually compressed .csv files. Then it writes those records to a BigQuery table. The following code was working in batch mode before I added the .watchForNewFiles(...) and .withMethod(STREAMING_INSERTS) parts. I am expecting it to run in streaming mode with those changes. However I am getting an exception that I can't find anything related on the web. Here is my code:

public static void main(String[] args) {       

    DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
            //.withValidation()
            .as(DataflowDfpOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    Stopwatch sw = Stopwatch.createStarted();
    log.info("DFP data transfer from GS to BQ has started.");

    pipeline.apply("ReadFromStorage", TextIO.read()
            .from("gs://my-bucket/my-folder/*.gz")
            .withCompression(Compression.GZIP)
            .watchForNewFiles(
                    // Check for new files every 30 seconds
                    Duration.standardSeconds(30),
                    // Never stop checking for new files
                    Watch.Growth.never()
            )
    )
            .apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
            .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                    .to(options.getTableId())
                    .withMethod(STREAMING_INSERTS)
                    .withCreateDisposition(CREATE_NEVER)
                    .withWriteDisposition(WRITE_APPEND)
                    .withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead

    pipeline.run().waitUntilFinish();

    log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}

/**
 * Creates a TableRow from a CSV line
 */
private static class TableRowConverterFn extends DoFn<String, TableRow> {

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        String[] split = c.element().split(",");

        //Ignore the header line
        //Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
        if (split[0].equals("Time")) {
            log.info("Skipped header");
            return;
        }

        TableRow row = new TableRow();
        for (int i = 0; i < split.length; i++) {
            TableFieldSchema col = TableSchema.getFields().get(i);

            //String is the most common type, putting it in the first if clause for a little bit optimization.
            if (col.getType().equals("STRING")) {
                row.set(col.getName(), split[i]);
            } else if (col.getType().equals("INTEGER")) {
                row.set(col.getName(), Long.valueOf(split[i]));
            } else if (col.getType().equals("BOOLEAN")) {
                row.set(col.getName(), Boolean.valueOf(split[i]));
            } else if (col.getType().equals("FLOAT")) {
                row.set(col.getName(), Float.valueOf(split[i]));
            } else {
                //Simply try to write it as a String if
                //todo: Consider other BQ data types.
                row.set(col.getName(), split[i]);
            }
        }
        c.output(row);
    }
}

And the stack trace:

java.lang.IllegalArgumentException: Not expecting a splittable ParDoSingle: should have been overridden
    at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:167)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:145)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:86)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:165)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:684)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.diply.data.App.main(App.java:66)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

Here is my command to publish the job on Dataflow:

clean compile exec:java -Dexec.mainClass=com.my.project.App "-Dexec.args=--runner=DataflowRunner --tempLocation=gs://my-bucket/tmp --tableId=Temp.TestTable --project=my-project --jobName=dataflow-dfp-streaming" -Pdataflow-runner

I use apache beam version 2.5.0. Here is the relevant section from my pom.xml.

 <properties>
   <beam.version>2.5.0</beam.version>
   <bigquery.version>v2-rev374-1.23.0</bigquery.version>
   <google-clients.version>1.23.0</google-clients.version>
   ...
 </properties>
1
What version of dataflow are you using ? I just ran your code without errors in 2.5.0 Can you also provide us with the command line arguments through which you run your DataflowRunner ?The hBar Tender
@ThehBarTender I edited the question and added the details you asked for. Thanks.hrzafer
Do you also get this error with a DirectRunner ? Here's the code I'm running to test your issue pastebin.com/RmjcsYLz, are you able to run it without errors with your pom ? If not, could you try with this pom pastebin.com/BKCZVnqX and tell me what happens ?The hBar Tender
@ThehBarTender Neither worked. Here is my pom. pastebin.com/AFA5EbZL. With direct-runner I don't get any errors, it can successfully watch the new files but it doesn't write data to the BQ. So, it doesn't work as expected in local mode either.hrzafer
For your original question, see my answer below. If an issue persists with BigQuery, consider opening a separate StackOverflow question :-)The hBar Tender

1 Answers

2
votes

Running the code with Dataflow 2.4.0 gives a more explicit error: java.lang.UnsupportedOperationException: DataflowRunner does not currently support splittable DoFn

However, this answer suggests that this is supported since 2.2.0. This is indeed the case, and following this remark you need to add the --streaming option in your Dexec.args to force it into streaming mode.

I tested it with the code I supplied in the comments with both your pom and mine and both 1. produce your error without --streaming 2. run fine with --streaming

You might want to open a github beam issue since this behavior is not documented anywhere offically as far as I know.