I am trying to perform a count operation on a KStream and running into some difficulty in understanding how serialization is working here. I have a stream that is pushing people information e.g. name, age. After consuming this stream, i am trying to create a KTable with a count of people's age.
Input: {"name" : "abc","age" : "15"}
Output: 30, 10 20, 4 10, 8 35, 22 ...
Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "person_processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Processor
KStream<Object, Person> people = builder.stream("people");
people.print(Printed.<Object, Person>toSysOut().withLabel("consumer-1"));
Output [consumer-1]: null, [B@7e37bab6
Question-1 I understand that data in the topic is in bytes. I am not setting any Serdes for Key or Value to start with. Is KStream converting the input from bytes to Person and printing the address of Person here?
Question-2 When I add the below value Serdes, I get a more meaningful output. Is the byte information here getting converted to String and then to Person? Why is the value now printed correctly?
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
[consumer-1]: null, {"name" : "abc","age" : "15"}
Question-3 Now, when performing the count on the age, I get a runtime error on converting a String to Person. If groupBy is setting the age as the Key and the count as Long, why is the String to Person conversion happening?
KTable<Integer, Long> integerLongKTable = people.groupBy((key, value) -> value.getAge())
.count();
Exception in thread "person_processor-9ff96b38-4beb-4594-b2fe-ae191bf6b9ff-StreamThread-1" java.lang.ClassCastException: java.lang.String cannot be cast to com.example.kafkastreams.KafkaStreamsApplication$Person
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:152)
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:149)
Edit-1
After reading through the response from @Matthias J. Sax I created a PersonSerde using the Serializer and DeSerializer from this locatio, I get this SerializationException...
static class Person {
String name;
String age;
public Person(String name, String age) {
this.name = name;
this.age = age;
}
void setName(String name) {
this.name = name;
}
String getName() {
return name;
}
void setAge(String age) {
this.age = age;
}
String getAge() {
return age;
}
@Override
public String toString() {
return "Person {name:" + this.getName() + ",age:" + this.getAge() + "}";
}
}
public class PersonSerde implements Serde {
@Override
public void configure(Map map, boolean b) {
}
@Override
public void close() {
}
@Override
public Serializer serializer() {
Map<String, Object> serdeProps = new HashMap<>();
final Serializer<Person> personSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", Person.class);
personSerializer.configure(serdeProps, false);
return personSerializer;
}
@Override
public Deserializer deserializer() {
Map<String, Object> serdeProps = new HashMap<>();
final Deserializer<Person> personDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", Person.class);
personDeserializer.configure(serdeProps, false);
return personDeserializer;
}
}
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, personSerde.getClass());
KTable<String, Long> count = people.selectKey((key, value) -> value.getAge()).groupByKey(Serialized.with(Serdes.String(), personSerde))
.count();
Error
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing JSON message
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.example.kafkastreams.KafkaStreamsApplication$Person and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:313)
Edit 5
So it appears that when I mapValues to a String, then count works correctly. But when I use it on a custom Object, it fails
KStream<String, Person> people = builder.stream("person-topic", Consumed.with(Serdes.String(), personSerde));
people.print(Printed.<String, Person>toSysOut().withLabel("person-source"));
KStream<String, Person> agePersonKStream = people.selectKey((key, value) -> value.getAge());
agePersonKStream.print(Printed.<String, Person>toSysOut().withLabel("age-person"));
KStream<String, String> stringStringKStream = agePersonKStream.mapValues((person -> person.name));
stringStringKStream.print(Printed.<String, String>toSysOut().withLabel("age-name"));
KTable<String, Long> stringLongKTable = stringStringKStream.groupByKey(Serialized.with(Serdes.String(), Serdes.String())).count();
stringLongKTable.toStream().print(Printed.<String, Long>toSysOut().withLabel("age-count"));
Without the 3 step to mapValues to name, step 4 fails.