4
votes

i am playing with Kafka and streams technology; i have created a custom serializer and deserializer for the KStream that i will use to receive messages from a given topic.

Now, the problem is that i am creating a serde in this way:

JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

Serializer implementation:

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}  

Deserializer implementation:

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        System.out.print(data);
        if(data == null){
            return null;
        }

        return gson.fromJson(new String(data),deserializedClass);
    }

    @Override
    public void close() {

    }
}

When i try to execute the code, i receive the following error:

Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.kafka.common.serialization.Serdes$WrapperSerde Does it have a public no-argument constructor?

Full dump here: https://pastebin.com/WwpuXuxB

This is the way i am trying to use serde:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);

KStream<String, EventMessage> outStream = eventsStream
            .mapValues(value -> EventMessage.build(value.type, value.timestamp));

outStream.to("output");

Also, i am not totally sure i am setting up correctly the properties to setup up serializer and deserializer globally:

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());
3
Can you check whether adding an explicit default (non-argument) constructor to JsonSerializer would help?Michael G. Noll
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass() should be StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(), for example.Michael G. Noll

3 Answers

7
votes

To complete the Matthias answer I've just coded a simple example of how to create a custom Serde (Serializer / Deserializer) within a Kafka Stream App. It's is available to clone and try in: https://github.com/Davidcorral94/Kafka-Streams-Custom-Seder

First I create two classes, one for the Serializer and another for the Deserializer. In this case I use Gson library to perform the serialization/deserialization.

Serializer

public class PersonSerializer implements Closeable, AutoCloseable, Serializer<Person> {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String s, Person person) {
        // Transform the Person object to String
        String line = gson.toJson(person);
        // Return the bytes from the String 'line'
        return line.getBytes(CHARSET);
    }

    @Override
    public void close() {

    }
}

Deserializer

public class PersonDeserializer implements Closeable, AutoCloseable, Deserializer<Person> {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public Person deserialize(String topic, byte[] bytes) {
        try {
            // Transform the bytes to String
            String person = new String(bytes, CHARSET);
            // Return the Person object created from the String 'person'
            return gson.fromJson(person, Person.class);
        } catch (Exception e) {
            throw new IllegalArgumentException("Error reading bytes", e);
        }
    }

    @Override
    public void close() {

    }
}

Then, I wrap both of them into a Serde to be able to use it into my Kafka Stream App.

Serde

public class PersonSerde implements Serde<Person> {
    private PersonSerializer serializer = new PersonSerializer();
    private PersonDeserializer deserializer = new PersonDeserializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<Person> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<Person> deserializer() {
        return deserializer;
    }
}

Finally, you are able to use this Serde class into your Kafka Stream App with the next line:

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerde.class);

This is actually working with the latest Kafka version available at this moment which is 1.0.0!

3
votes

If you call Serdes.serdeFrom(...) you get a WrappedSerde type back that is for internal usage (and WrappedSerde does not have an non-argument constructor). There is currently no API you can call to get a custom Serde. Instead, you need to implement you own Serde class and wrap you serializer and deserializer "manually".

public class EventMessageSerde implements Serde<EventMessage> {
    final private JsonSerializer<EventMessage> serializer;
    final private JsonDeserializer<EventMessage> deserializer;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<EventMessage> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<EventMessage> deserializer() {
        return deserializer;
    }
}

In your Properties you can set:

streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, EventMessageSerde.class);
3
votes

Another way is using StreamsBuilder instead of KStreamBuilder. KStreamBuilder is deprecated in 1.0.0. You can directly pass serde object using Consumed.with while creating stream. You need not to create custom Serde class in this scenario.

Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(topic, Consumed.with(Serdes.String(), messageSerde));

You can keep StringSerde in below code instead of using messageSerde.getClass() which is failing because messageSerde is just a WrappedSerde that does not have non-argument constructor.

streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());