4
votes

I am trying to write to Confluent kafka with schema registry from Flink using FlinkKafkaProducer10. Below error is produced. I created custom schema serializer see ConfluentAvroSerializationSchema class. Code compiles but produces runtime error. Complete code example that produces the error is on https://github.com/dmiljkovic/test-flink-schema-registry. All resources are mocked, kafka cluster and schema registry. Code is actually test case.

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

public class ConfluentAvroSerializationSchema<T> implements SerializationSchema<T> {
private static final long serialVersionUID = 1L;
private final String topic;
private final KafkaAvroSerializer avroSerializer;

public ConfluentAvroSerializationSchema(String topic, KafkaAvroSerializer avroSerializer) {
    this.topic =topic;
    this.avroSerializer = avroSerializer;
}

@Override
public byte[] serialize(T obj) {
    return avroSerializer.serialize(topic, obj);
}
}

    //serialize avro
    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(schemaRegistry);
    avroSerializer.configure(new HashMap(registryProp), false);
    ConfluentAvroSerializationSchema ser =
            new ConfluentAvroSerializationSchema<TestRecordAvro>(topic, avroSerializer);

    //write to kafka
    FlinkKafkaProducer010.writeToKafkaWithTimestamps(ds, topic, ser, flinkProp);

org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@6e28bb87 is not serializable

at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:109)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.<init>(FlinkKafkaProducerBase.java:145)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09.<init>(FlinkKafkaProducer09.java:130)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.<init>(FlinkKafkaProducer010.java:227)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.writeToKafkaWithTimestamps(FlinkKafkaProducer010.java:137)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.writeToKafkaWithTimestamps(FlinkKafkaProducer010.java:115)
at com.acme.kafka_avro_util.TestProducer.testAvroConsumer(TestProducer.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroSerializer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:107)
2

2 Answers

3
votes

Flink has to serialize all operators (incl. your sink) to send it to the task-manager.

The problem is that the KafkaAvroSerializer which you are using in your ConfluentAvroSerializationSchema class is not serializable at all. That makes your sink unserializable. You could initialize the KafkaAvroSerializer lazy (e.g. pass the properties for the schema-registry instead of the instance of KafkaAvroSerializer to ConfluentAvroSerializationSchema and create the KafkaAvroSerializer on the first call of serialize).

3
votes

private final KafkaAvroSerializer avroSerializer;

to

private final transient KafkaAvroSerializer avroSerializer;

then initialize it in a setup method. Sort of:

@Setup
public void start() {
    this.serializer = //initialization
}