4
votes

This portion of my pipeline is supposed to take an input, apply the appropriate tuple tag to it, and then do further processing on the input based on the the tag it receives.

When running the code below, the PCollection from main tag (tag1) works properly. However, the additional tags (tag2, tag3) will throw this error on the .apply():

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Assign Output.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.

Why does this error occur on tag2 but not on tag1? Note, if i make tag2 the main output and tag1/tag3 the additional outputs and reorder the code appropriately, tag2 processing is successful, but tag1/tag3 will throw the error.

Main Pipeline:

PCollectionTuple pct  = outputPair.apply("Assign Output", ParDo.of( new output())
              .withOutputTags(output.tag1, TupleTagList.of(output.tag2).and(output.tag3)));

//Tag1 Output
PCollection<KV<String, outResultPair>> tagPair1 = pct.get(output.tag1)
        .apply("Process", ParDo.of( new ABCOutput()))

//Tag2 Output 
PCollection<KV<String, outResultPair>> tagPair2 = pct.get(output.tag2)
        .apply("Process", ParDo.of( new DEFOutput())) //Error Thrown here

Supporting Classes:

    //ABCOutput Class 
    @DefaultCoder(AvroCoder.class)
    public class ABCOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {    
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, inResultPair> e = c.element();
            c.output( processInput(e) );
        }
    }

    //XYZOutput Class 
    @DefaultCoder(AvroCoder.class)
    public class XYZOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {    
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, inResultPair> e = c.element();
            c.output( processInput(e) );
        }
    }

    //Output Splitter
    @DefaultCoder(AvroCoder.class)
    public class output {
        private final static Logger LOG = LoggerFactory.getLogger(OutputHandler.class);

        final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
        final static TupleTag<KV<String,inResultPair>> tag2 = new TupleTag();
        final static TupleTag<KV<String,inResultPair>> tag3 = new TupleTag();
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, inResultPair> e = c.element();
            KV<String, outResultPair> out = process(e);

            switch(e.getValue().type){
                case 1:
                    c.output(tag1, out);
                break;
                case 2:
                    c.output(tag2, out);
                break;
                case 3:
                    c.output(tag3, out);
                break;
            }
            c.output();
        }

    }
1

1 Answers

11
votes

You need to construct the TupleTag's in a way that their type information will be preserved by the Java compiler, whereas currently you're constructing them as raw types, so Beam's coder inference doesn't know what type are the elements output into this tag.

Change:

 final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();

to:

 final static TupleTag<KV<String,inResultPair>> tag1 =
     new TupleTag<KV<String, inResultPair>>() {};

The {} is critically important for preserving type information here.