4
votes

I have a simple pipeline in dataflow 2.1 sdk. Which reads data from pubsub then applies a DoFn to it.

PCollection<MyClass> e = streamData.apply("ToE", ParDo.of(new MyDoFNClass()));

Getting below error on this pipeline:

java.lang.IllegalStateException: Unable to return a default Coder for ToEvents/ParMultiDo(MyDoFNClass).out0 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.X.X.model.MyClass.

MyDoFn class is below:

@DefaultCoder(AvroCoder.class)

public class MyClass{

    public long id;
    public HashMap<String,HashSet<String>> a;

    @SerializedName("a")
    public Integer Id;
    @SerializedName("ae")
    public String ae;
}
2
1) What does MyDoFNClass look like? 2) Alternatively, have you tried specifying a coder manually using .setCoder() as the message suggests?jkff

2 Answers

17
votes

Found the solution just neeeded to add implements Serializable to MyClass

@DefaultCoder(AvroCoder.class)

public class MyClass implements Serializable {

public long id;
public HashMap<String,HashSet<String>> a;

@SerializedName("a")
public Integer Id;
@SerializedName("ae")
public String ae;
}
0
votes

Below are some docs about coder from beam programming guide

The Beam SDKs require a coder for every PCollection in your pipeline. In most cases, the Beam SDK is able to automatically infer a Coder for a PCollection based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a Coder explicitly, or develop a Coder for their custom type.

Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.

Go to the below link to see default coders used by beam libraries - https://beam.apache.org/documentation/programming-guide/#default-coders-and-the-coderregistry

If your object that you are using in pCollections does not lies within the default coder, you may have to provide a custom coder for that object. for e.g. If you look at the implemenation of PubsubIO.write()/PubsubIO.read() methods, they use a custom coder. e.g. PubsubMessagePayloadOnlyCoder

Suppose you are converting a string into Pubsub Message. You can supply this coder to your pcollection.

PCollection<PubsubMessage> pubsubMessagePCollection = pCollectionTuple.get(accountId);
pubsubMessagePCollection.setCoder(PubsubMessagePayloadOnlyCoder.of());