0
votes

I'm writing an app that uses Kafka streams. It reads from topic A, makes some transformations, and writes to topic B. During the transformations the values are grouped by key so the output key, value types are different than the input value types. Kafka streams use Serdes of a specific type (e.g. String serdes serializes and deserializes strings) for both serialization and deserialization, so it won't work after the data is transformed. How can I define different serializers and deserializers within the Streams API?

1

1 Answers

2
votes

sure, you can

when you either create a stream, invoke groupBy or write output to some topic, you could provide Serde or Serialized. Example:

Serde<String> stringSerde = Serdes.String();
Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
Produced<String, YourCustomItem> produced = Produced.with(stringSerde, new JsonSerde<>(YourCustomItem.class));

KStream<String, String> kStream = streamsBuilder.stream("sourceTopicName", consumed);
KStream<String, YourCustomItem> transformedKStream = kStream.mapValues((key, value) -> new YourCustomItem());
transformedKStream.to("destinationTopicName", produced);

transformedKStream.groupByKey(Serialized.with(Serdes.String(), new JsonSerde<>(YourCustomItem.class)));

where JsonSerde is from spring-kafka dependency. or you could use the following Serde:

Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);