I'm writing an app that uses Kafka streams. It reads from topic A, makes some transformations, and writes to topic B. During the transformations the values are grouped by key so the output key, value types are different than the input value types. Kafka streams use Serdes of a specific type (e.g. String serdes serializes and deserializes strings) for both serialization and deserialization, so it won't work after the data is transformed. How can I define different serializers and deserializers within the Streams API?
0
votes
1 Answers
2
votes
sure, you can
when you either create a stream, invoke groupBy or write output to some topic, you could provide Serde
or Serialized
. Example:
Serde<String> stringSerde = Serdes.String();
Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
Produced<String, YourCustomItem> produced = Produced.with(stringSerde, new JsonSerde<>(YourCustomItem.class));
KStream<String, String> kStream = streamsBuilder.stream("sourceTopicName", consumed);
KStream<String, YourCustomItem> transformedKStream = kStream.mapValues((key, value) -> new YourCustomItem());
transformedKStream.to("destinationTopicName", produced);
transformedKStream.groupByKey(Serialized.with(Serdes.String(), new JsonSerde<>(YourCustomItem.class)));
where JsonSerde
is from spring-kafka
dependency.
or you could use the following Serde
:
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);