0
votes

I have a Google Dataflow job using library version 1.9.1 job, The job was taking runtime arguments. We used the TextIO.read().from().withoutValidation(). Since we migrated to google dataflow 2.0.0 , the withoutValidation is removed in 2.0.0. The Release notes page doesnt talk about this https://cloud.google.com/dataflow/release-notes/release-notes-java-2 .

We tried to pass the input as a ValueProvider.RuntimeProvider. But During pipeline construction, we get the following error. If pass it as ValueProvider the pipeline creation is trying to validate the value provider. How do I provide a runtime value provider for a TextIO input in google cloud dataflow 2.0.0?

java.lang.RuntimeException: Method getInputFile should not have return type RuntimeValueProvider, use ValueProvider instead. at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:505)

1
RuntimeValueProvider is an extremely low-level internal implementation-detail class that can never be used by users; the fact that it has public visibility is an unfortunate accident of the Java visibility system. What do you mean by "the pipeline creation is trying to validate the value provider"? I don't think that should be happening. Can you include your code and a complete printout of the error you're getting? - jkff
As @jkff has said, please post your code. I assumed you are using templated pipelines? I'm using 2.1.0 with parameter for templates without any problems. - Graham Polley

1 Answers

1
votes

I'm going to assume you are using templated pipelines, and that your pipeline is consuming runtime parameters. Here's a working example using the Cloud Dataflow SDK version 2.1.0. It reads a file from GCS (passed to the template at runtime), turns each row into a TableRow and writes to BigQuery. It's a trivial example, but it works with 2.1.0.

Program args are as follows:

 --project=<your_project_id>
 --runner=DataflowRunner
 --templateLocation=gs://<your_bucket>/dataflow_pipeline
 --stagingLocation=gs://<your_bucket>/jars
 --tempLocation=gs://<your_bucket>/tmp

Program code is as follows:

public class TemplatePipeline {
    public static void main(String[] args) {
        PipelineOptionsFactory.register(TemplateOptions.class);
        TemplateOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(TemplateOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("READ", TextIO.read().from(options.getInputFile()).withCompressionType(TextIO.CompressionType.GZIP))
                .apply("TRANSFORM", ParDo.of(new WikiParDo()))
                .apply("WRITE", BigQueryIO.writeTableRows()
                        .to(String.format("%s:dataset_name.wiki_demo", options.getProject()))
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        pipeline.run();
    }

    private static TableSchema getTableSchema() {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
        fields.add(new TableFieldSchema().setName("language").setType("STRING"));
        fields.add(new TableFieldSchema().setName("title").setType("STRING"));
        fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
        return new TableSchema().setFields(fields);
    }

    public interface TemplateOptions extends DataflowPipelineOptions {
        @Description("GCS path of the file to read from")
        ValueProvider<String> getInputFile();

        void setInputFile(ValueProvider<String> value);
    }

    private static class WikiParDo extends DoFn<String, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            String[] split = c.element().split(",");
            TableRow row = new TableRow();
            for (int i = 0; i < split.length; i++) {
                TableFieldSchema col = getTableSchema().getFields().get(i);
                row.set(col.getName(), split[i]);
            }
            c.output(row);
        }
    }
}