1
votes

I am trying to perform a count operation on a KStream and running into some difficulty in understanding how serialization is working here. I have a stream that is pushing people information e.g. name, age. After consuming this stream, i am trying to create a KTable with a count of people's age.

Input: {"name" : "abc","age" : "15"}

Output: 30, 10 20, 4 10, 8 35, 22 ...

Properties

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "person_processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

Processor

KStream<Object, Person> people = builder.stream("people");
people.print(Printed.<Object, Person>toSysOut().withLabel("consumer-1"));

Output [consumer-1]: null, [B@7e37bab6

Question-1 I understand that data in the topic is in bytes. I am not setting any Serdes for Key or Value to start with. Is KStream converting the input from bytes to Person and printing the address of Person here?

Question-2 When I add the below value Serdes, I get a more meaningful output. Is the byte information here getting converted to String and then to Person? Why is the value now printed correctly?

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

[consumer-1]: null, {"name" : "abc","age" : "15"}

Question-3 Now, when performing the count on the age, I get a runtime error on converting a String to Person. If groupBy is setting the age as the Key and the count as Long, why is the String to Person conversion happening?

KTable<Integer, Long> integerLongKTable = people.groupBy((key, value) -> value.getAge())
    .count();

Exception in thread "person_processor-9ff96b38-4beb-4594-b2fe-ae191bf6b9ff-StreamThread-1" java.lang.ClassCastException: java.lang.String cannot be cast to com.example.kafkastreams.KafkaStreamsApplication$Person
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:152)
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:149)

Edit-1

After reading through the response from @Matthias J. Sax I created a PersonSerde using the Serializer and DeSerializer from this locatio, I get this SerializationException...

https://github.com/apache/kafka/tree/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

static class Person {

    String name;
    String age;

    public Person(String name, String age) {

      this.name = name;
      this.age = age;
    }

    void setName(String name) {

      this.name = name;
    }

    String getName() {

      return name;
    }

    void setAge(String age) {

      this.age = age;
    }

    String getAge() {

      return age;
    }

    @Override
    public String toString() {

      return "Person {name:" + this.getName() + ",age:" + this.getAge() + "}";
    }
  }

public class PersonSerde implements Serde {

  @Override
  public void configure(Map map, boolean b) {

  }

  @Override
  public void close() {

  }

  @Override
  public Serializer serializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Serializer<Person> personSerializer = new JsonPOJOSerializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personSerializer.configure(serdeProps, false);

    return personSerializer;
  }

  @Override
  public Deserializer deserializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Deserializer<Person> personDeserializer = new JsonPOJODeserializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personDeserializer.configure(serdeProps, false);

    return personDeserializer;
  }
}

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, personSerde.getClass());

KTable<String, Long> count = people.selectKey((key, value) -> value.getAge()).groupByKey(Serialized.with(Serdes.String(), personSerde))
      .count();

Error

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing JSON message
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.example.kafkastreams.KafkaStreamsApplication$Person and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:313)

Edit 5

So it appears that when I mapValues to a String, then count works correctly. But when I use it on a custom Object, it fails

KStream<String, Person> people = builder.stream("person-topic", Consumed.with(Serdes.String(), personSerde));
people.print(Printed.<String, Person>toSysOut().withLabel("person-source"));

KStream<String, Person> agePersonKStream = people.selectKey((key, value) -> value.getAge());
agePersonKStream.print(Printed.<String, Person>toSysOut().withLabel("age-person"));

KStream<String, String> stringStringKStream = agePersonKStream.mapValues((person -> person.name));
stringStringKStream.print(Printed.<String, String>toSysOut().withLabel("age-name"));

KTable<String, Long> stringLongKTable = stringStringKStream.groupByKey(Serialized.with(Serdes.String(), Serdes.String())).count();
stringLongKTable.toStream().print(Printed.<String, Long>toSysOut().withLabel("age-count"));

Without the 3 step to mapValues to name, step 4 fails.

1

1 Answers

0
votes

Question-1 I understand that data in the topic is in bytes. I am not setting any Serdes for Key or Value to start with. Is KStream converting the input from bytes to Person and printing the address of Person here?

If you don't specify any Serde in StreamsConfig or in builder.stream(..., Consumers.with(/*serdes*/)) the bytes won't be converted into a Person object but the object will be of type byte[]. Thus, print() will call byte[].toString() that results in the cryptic output ([B@7e37bab6) you see.

Question-2 When I add the below value Serdes, I get a more meaningful output. Is the byte information here getting converted to String and then to Person? Why is the value now printed correctly?

As you specify Serde.String() in StreamsConfig the bytes are converted to String type. It seems, that StringSerde is able to deserialize the bytes in a meaningful way -- but this seems to be a coincidence that it works at all. It seems that your data is actually serialized in JSON, what would explain why StringSerde() can convert the bytes into a String.

Question-3 Now, when performing the count on the age, I get a runtime error on converting a String to Person. If groupBy is setting the age as the Key and the count as Long, why is the String to Person conversion happening?

That is expected. Because the bytes are converted into a String object (as you specified Serdes.String()), the cast cannot be performed.

Final remarks:

You don't get a class cast exception if you only use print(), because for this case, no cast operation is performed. Java only inserts a cast operation if required.

For groupBy() you use value.getAge() and thus Java inserts a cast here (it knows that the expected type is Person, because it's specified via KStream<Object, Person> people = .... For print() only toString() is called that is define on Object and thus no cast is required.

Generics in Java a type hints for the compiler and replaced with Object (or casted if required during compile time). Thus, for print() a Object variable can point to an byte[] without problem and toString() is called successfully. For groupBy() case the compiler cast Object to Person to be able to call getAge() -- however, this fails, because the actually type is String.

To get your code working, you need to create a PersonSerde extend Serde<Person> class and specify it as value serde.