0
votes

when I try to add MyPipelineOptions parameter to my Google Dataflow DoFN as documented, I get a compiler error:

java.lang.IllegalArgumentException:
com.xxx.MyProcessor,
@ProcessElement parseItem(PubsubMessage, MyPipelineOptions, OutputReceiver),
@ProcessElement parseItem(PubsubMessage, MyPipelineOptions, OutputReceiver),
parameter of type MyPipelineOptions at index 1:
MyPipelineOptions is not a valid context parameter.
Should be one of [BoundedWindow, RestrictionTracker<?, ?>]

If I change MyPipelineOptions to PipelineOptions, the error is gone, but if I try to cast back to MyPipelineOptions inside my function, I get a ClassCastException so I'm guessing it's not the right way... Any idea how I pass my custom options class to the element processors?

Here's the code structure:

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;

public interface MyPipelineOptions extends DataflowPipelineOptions {
  ...
}
public class MyProcessor extends DoFn<PubsubMessage, String> {
  @ProcessElement
  public void parseItem(@Element PubsubMessage message, MyPipelineOptions po, OutputReceiver<String> out) throws Exception {
    ...
  }
}

Note the docs only show an example of non-custom PipelineOptions:

PipelineOptions: The PipelineOptions for the current pipeline can always be accessed in a process method by adding it as a parameter:

.of(new DoFn<String, String>() {
  public void processElement(
    @Element String word, PipelineOptions options) {

  }
 })
1
Normally I do it by passing it as a parameter to the constructor. See example - Guillem Xercavins
not very convenient if I need to pass multiple arguments and misses the whole point of pipeline options availability and design :( plus doesn't explain why Beam explicity say it's supported. - Sagi Mann

1 Answers

1
votes

Ok found the problem. The argument PipelineOptions is a proxy. In order to get it correctly I need to do this:

public class MyProcessor extends DoFn<PubsubMessage, String> {
  @ProcessElement
  public void parseItem(
    @Element PubsubMessage message,
    PipelineOptions po,
    OutputReceiver<String> out) throws Exception {

      MyPipelineOptions opts = po.as(MyPipelineOptions.class);
      ...
    }
  }
}