0
votes

I am building a dataflow pipeline with Apache beam. Below is the pseudo code:

PCollection<GenericRecord> rows = pipeline.apply("Read Json from PubSub", <some reader>)
    .apply("Convert Json to pojo", ParDo.of(new JsonToPojo()))
    .apply("Convert pojo to GenericRecord", ParDo.of(new PojoToGenericRecord()))
    .setCoder(AvroCoder.of(GenericRecord.class, schema));

I am trying to get rid of setting the coder in the pipeline as schema won't be known at pipeline creation time (it will be present in the message).

I commented out the line that sets the coder and got an Exception saying that default coder is not configured. I used one argument version of of method and got the following Exception:

Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
    ... 9 more

Is there any way for us to supply the coder at runtime, without knowing the schema beforehand?

1
What does the pipeline do with the GenericRecord's downstream? Is it possible to simply not use GenericRecord?jkff
@jkff it converts GenericRecord to TableRow and writes to BigQuery. It also uses AvroIO to write to file system (which is another problem I am facing btw, as AvroIO needs schema too).Darshan Mehta
What schema to you want the Bigquery table and the generated Avro files to have? It sounds like you're writing records with many different schemas, so I suppose they have to go into different tables and different files?jkff
@jkff We are using schema evolution so both schema comply to the same BigQuery table. Just that one schema will have extra fields and other won't. We will insert null values for those fields.Darshan Mehta

1 Answers

1
votes

This is possible. I recommend the following approach:

  • Do not use an intermediate collection of type GenericRecord. Keep it as a collection of your POJOs.
  • Write some transform that extracts the schema of your data and makes it available as a PCollectionView<however you want to represent the schema>.
  • When writing to BigQuery, write your PCollection<YourPojo> via write().to(DynamicDestinations), and when writing to Avro, use FileIO.write() or writeDynamic() in combination with AvroIO.sinkViaGenericRecords(). Both of these can take a dynamically computed schema from a side input (that you computed above).