2
votes

I receive messages from mqtt broker on a kafka topic using kafka-mqtt connector. Then i read this messages from kafka topic using a kafka consumer on spark. When I print the messages this is the result. How can i read correctly the messages?

This is the code for set consumer and create stream.

SparkConf sparkConf = new SparkConf().setAppName("GestoreSoccorso").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(500));

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "10.0.4.215:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("ankioverdrive_v1_events");

JavaInputDStream<ConsumerRecord<String, String>> stream =
        KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        );

Then this is the code that I use to read the messages from topic and print them.

stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
                consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<String, String>>() {
                    @Override
                    public void call(ConsumerRecord<String, String> stringStringConsumerRecord) throws Exception {
                        String stringa=stringStringConsumerRecord.value();
                        System.out.println(DEBUG+"DATI RICEVUTI -> "+ stringa);

Finally this is the output

DEBUG: DATI RICEVUTI ->     ܑ�՛Y
SKULL   �    unknown0% 
1
How do you know MQTT data was serialized as a string?OneCricketeer
I know it because i seen the code of mqtt message and it is a json convert in a string and then published.Francesco
If it were a string or JSON, you would not be seeing ܑ�՛Y characters. The message is either compressed, encrypted, or not JSON/plaintext. Can you show the output of kafka-console-consumer?OneCricketeer
I can't see the output console now, I can tomorrow. I seen the output this morning and on consumer console, there are a lot of lines of this type: x00/x00/.../SKULL/..../unknown/... I think that this is hex.Francesco
Looks like it, yes. Your topics have binary data that isn't entirely UTF-8 strings. In Spark, you could use ByteArrayDeserializer.class.getName(), but you still need to determine how to decode the bytes, which can only be done by knowing what format they got into Kafka to begin withOneCricketeer

1 Answers

0
votes

You are passing the wrong values for parameters key.deserializer and value.deserializer.

Instead of

kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);

you need to pass

kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
kafkaParams.put("value.deserializer", StringDeserializer.class.getName());

or simply

kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");