2
votes

I am producing input data as json string .

For Topic - myinput

{"time":"2017-11-28T09:42:26.776Z","name":"Lane1","oclass"
     :"myClass","id":112553,"Scope":"198S"}

My class looking like this:

public class App {
    static public class CountryMessage {

        public String time;
        public String Scope;
        public String id;
        public String oclass;
        public String name; 
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");


        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        Map < String, Object > serdeProps = new HashMap < > ();
        final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageSerializer.configure(serdeProps, false);

        final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageDeserializer.configure(serdeProps, false);
        final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer,countryMessageDeserializer);

        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream<String, CountryMessage> countriesStream = kStreamBuilder.stream(stringSerde, countryMessageSerde, "vanitopic");

        KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

        KTable<Windowed<String>, Long> aggregatedStream = countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

        System.out.println("Starting Kafka Streams Countries Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
        kafkaStreams.start();
        System.out.println("Now started CountriesStreams Example");
    }

    private static Properties getProperties() {
        Properties settings = new Properties();

        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.106.9.235:9092,10.106.9.235:9093,10.106.9.235:9094");
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.106.9.235:2181");
        //settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return settings;
    }
}

I am getting bellow class cast exception:

Exception in thread "countries-streaming-analysis-app-f7f95119-4401-4a6e-8060-5a138ffaddb2-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=vanitopic, partition=0, offset=2036 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.cisco.streams.countries.App$CountryMessage). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) ... 3 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:88) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ... 16 more

I need help to understand how and where to apply custom Serdes those I have created

2
Have you tried using stringSerde and countryMessageSerde in getProperties method (as StreamsConfig.KEY_SERDE_CLASS_CONFIG and StreamsConfig.VALUE_SERDE_CLASS_CONFIG)? Also what version of Kafka Streams do you use?sap1ens
You need to apply custom Serdes an each operator that does not match the default Serdes from StreamsConfig. Check out the docs: docs.confluent.io/3.3.1/streams/developer-guide/dsl-api.htmlMatthias J. Sax
yes i am doing this settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, CountryMessage.class); but still getting error. i am using 0.11.0.0.Shilpi
please help me to solve this. i am trying almost everything but getting same error.Shilpi

2 Answers

2
votes

In your code,

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

the groupByKey() need to set both serializer as this will trigger a data repartitioning. Or you set default Serded to for String and CountryMessage types.

As mentioned in my comment, every operator that does not use default Serdes from StreamsConfig need to set the correct Serdes.

Thus, also the count() operation need to specify corresponding String and Long Serdes:

countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

All operators that might need Serdes have appropriate overload. Just check out all overload of all operators you are using.

Check out the docs for more details: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html

2
votes

Add serializers to groupByKey

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey(Grouped.with(Serdes.String(), new ObjectMapperSerde<>(CountryMessage.class)));