6
votes

I am building a pipeline that reads Avro generic records. To pass GenericRecord between stages I need to register AvroCoder. The documentation says that if I use generic record, the schema argument can be arbitrary: https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of-java.lang.Class-org.apache.avro.Schema-

However, when I pass an empty schema to the method AvroCoder.of(Class, Schema) it throws an exception at run time. Is there a way to create an AvroCoder for GenericRecord that does not require a schema? In my case, each GenericRecord has an embedded schema.

The exception and stacktrace:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
2
What is the exception that is thrown? - Kenn Knowles
@KennKnowles Exception in thread "main" java.lang.NullPointerException at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562) at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430) at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409) at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260) at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141) - Nutel

2 Answers

1
votes

After reviewing the code for AvroCoder, I do not think the documentation is correct there. Your AvroCoder instance will need a way to figure out the schema for your Avro records - and likely the only way to do that is by providing one.

So, I'd recommend calling AvroCoder.of(GenericRecord.class, schema), where schema is the correct schema for the GenericRecord objects in your PCollection.

1
votes

I had a similar case and solved it with custom coder. The simplest (but sub-efficient) solution would be to encode schema along with each record. If your schemas are not too volatile you can get benefit of caching.

public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
    public static GenericRecordCoder of() {
        return new GenericRecordCoder();
    }
    private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

    @Override
    public void encode(GenericRecord value, OutputStream outStream) throws IOException {
        String schemaString = value.getSchema().toString();
        String schemaHash = getHash(schemaString);
        StringUtf8Coder.of().encode(schemaString, outStream);
        StringUtf8Coder.of().encode(schemaHash, outStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash, 
            s -> AvroCoder.of(value.getSchema()));
        coder.encode(value, outStream);
    }

    @Override
    public GenericRecord decode(InputStream inStream) throws IOException {
        String schemaString = StringUtf8Coder.of().decode(inStream);
        String schemaHash = StringUtf8Coder.of().decode(inStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash, 
             s -> AvroCoder.of(new Schema.Parser().parse(schemaString)));
        return coder.decode(inStream);
    }
}

While this solves the task, in fact I made it slightly different, using external schema registry (you can build this on the top of datastore for example). In this case you don't need to serialize/deserialize schema. The code looks like:

public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
    public static GenericRecordCoder of() {
        return new GenericRecordCoder();
    }
    private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

    @Override
    public void encode(GenericRecord value, OutputStream outStream) throws IOException {
        SchemaRegistry.registerIfAbsent(value.getSchema());
        String schemaName = value.getSchema().getFullName();
        StringUtf8Coder.of().encode(schemaName, outStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName, 
            s -> AvroCoder.of(value.getSchema()));
        coder.encode(value, outStream);
    }

    @Override
    public GenericRecord decode(InputStream inStream) throws IOException {
        String schemaName = StringUtf8Coder.of().decode(inStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName, 
             s -> AvroCoder.of(SchemaRegistry.get(schemaName)));
        return coder.decode(inStream);
    }
}

The usage is pretty straightforward:

PCollection<GenericRecord> inputCollection = pipeline
    .apply(AvroIO
           .parseGenericRecords(t -> t)
           .withCoder(GenericRecordCoder.of())
           .from(...));