I am building a dataflow pipeline with Apache beam. Below is the pseudo code:
PCollection<GenericRecord> rows = pipeline.apply("Read Json from PubSub", <some reader>)
.apply("Convert Json to pojo", ParDo.of(new JsonToPojo()))
.apply("Convert pojo to GenericRecord", ParDo.of(new PojoToGenericRecord()))
.setCoder(AvroCoder.of(GenericRecord.class, schema));
I am trying to get rid of setting the coder in the pipeline as schema won't be known at pipeline creation time (it will be present in the message).
I commented out the line that sets the coder and got an Exception
saying that default coder is not configured. I used one argument version of of
method and got the following Exception
:
Not a Specific class: interface org.apache.avro.generic.GenericRecord
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 9 more
Is there any way for us to supply the coder at runtime, without knowing the schema beforehand?
GenericRecord
toTableRow
and writes toBigQuery
. It also usesAvroIO
to write to file system (which is another problem I am facing btw, asAvroIO
needs schema too). – Darshan Mehtanull
values for those fields. – Darshan Mehta