5
votes

I'm having trouble creating a Map PCollectionView with the DataflowRunner.

The pipeline below aggregates an unbouded countingInput together with values from a side-input (containing 10 generated values). When running the pipeline on gcp it get's stuck inside the View.asMap() transform. More specifially, the ParDo(StreamingPCollectionViewWriter) does not have any output.

I tried this with dataflow 2.0.0-beta3, as well as with beam-0.7.0-SNAPSHOT, without any result. Note that my pipeline is running without any problem when using the local DirectRunner.

Am I doing something wrong? All help is appreciated, thanks in advance for helping me out!

public class SimpleSideInputPipeline {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);

    public interface Options extends DataflowPipelineOptions {}

    public static void main(String[] args) throws IOException {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        Pipeline pipeline = Pipeline.create(options);

        final PCollectionView<Map<Integer, String>> sideInput = pipeline
                .apply(CountingInput.forSubrange(0L, 10L))
                .apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(KV.of(c.element().intValue(), "TEST"));
                    }
                }))
                .apply(View.asMap());

        pipeline
            .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(5)))
            .apply("Aggregate with side-input",ParDo.of(new DoFn<Long, KV<Long, String>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    Map<Integer, String> map = c.sideInput(sideInput);

                    //get first segment from map
                    Object[] values = map.values().toArray();
                    String firstVal = (String) values[0];
                    LOG.info("Combined: K: "+ c.element() + " V: " + firstVal + " MapSize: " + map.size());
                    c.output(KV.of(c.element(), firstVal));
                }
            }).withSideInputs(sideInput));

        pipeline.run();
    }
}
1
Facing the same issue. Did you find any solution?Kapil Barad
Still no solution. Did you find any in the meantime?Wout
Seems to work for me (see my slightly modified code in BEAM-2155).Udi Meiri

1 Answers

1
votes

No need to worry that the ParDo(StreamingPCollectionViewWriterFn) does not record any output - what it does is actually write each element to an internal location.

You code looks OK to me, and this should be investigated. I have filed BEAM-2155.