1
votes

I new to both Spring Cloud Stream and Kafka. I am receiving the following error from the kafka producer when sending a string in the payload. Any help or insights is greatly appreciated. I tried using bytearray serializer/deserializer as well as json as oppposed to plain text.

error message: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

error stack:

2019-10-25 16:13:40.762 ERROR 4628 --- [  XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits   : Internal Server Error

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		....
		
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)

Listed here is Spring Cloud Stream settings

  cloud:
    stream:
      bindings:
          greetings-in:
              destination: greetings
              #content-type: application/json
              content-type: text/plain
          greetings-out:
              destination: greetings
              #content-type: application/json
              content-type: text/plain

Producer settings

2019-10-25 16:12:28.346  INFO 4628 --- [  restartedMain] com.ll.kafkaservice.KafkaServiceApp      : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [  restartedMain] c.l.k.aop.logging.LoggingAspect          : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377  INFO 4628 --- [  restartedMain] c.l.k.service.KafkaServiceKafkaProducer  : Kafka producer initializing...
2019-10-25 16:12:28.378  INFO 4628 --- [  restartedMain] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-10-25 16:12:28.399  INFO 4628 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0

Listed here is the object containing the message

package com.ll.kafkaservice.messaging;

import java.io.Serializable;

public class Greeting  implements Serializable {
	private static final long serialVersionUID = 1L;
	
	private String message;

	
	public Greeting() {
	}

	
	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
	
	public String toString() {
		StringBuffer sbuffer = new StringBuffer();
		
		sbuffer.append("{");
		sbuffer.append("message:");
		sbuffer.append(message);
		sbuffer.append("}");
		
		return sbuffer.toString();
	}
}

Define the streams

package com.ll.kafkaservice.greeting;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

Bind the streams

package com.ll.kafkaservice.config;

import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;


@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {

}

Produce/send message

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

import com.ll.kafkaservice.messaging.Greeting;


@Service
public class GreetingsService {
    private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
    
    private final GreetingsStreams greetingsStreams;
    
    private MessageChannel messageChannel;
    
    
    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }
    
    public void sendGreeting(final Greeting greeting) {
        messageChannel = greetingsStreams.outboundGreetings();
        log.info("Before send {}", greeting.toString());
        messageChannel.send(MessageBuilder
        		// Sends a string to payload not the object
                .withPayload(greeting.getMessage())
                // Note:  tried this with and without the header
                //.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
                .build());
    }
}

Consume/Receive message

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.ll.kafkaservice.messaging.Greeting;



@Component
public class GreetingsListener {
    private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);

    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greeting greetings) {
        log.info("Received greetings: {}", greetings.getMessage());
    }
    
    //@StreamListener(GreetingsStreams.INPUT)
    //public void handleGreetings(String greetings) {
    //    log.info("Received greetings: {}", greetings);
    //}
}
2
Note that the [B cannot be cast to java.lang.String error can also occur if the POJO does not define a public, no-argument constructor. That's not the case in this question, but FYI for future readers. See here.Tomboyo

2 Answers

3
votes

By default, the SCSt framework converts the payload to byte[] and uses ByteArraySerializers.

Since you have configured the binding to use custom serializers, you must set useNativeEncoding to true. See Producer Properties.

useNativeEncoding

When set to true, the outbound message is serialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use an appropriate decoder (for example, the Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the consumer property useNativeDecoding.

However you will need to use a JsonSerializer not just a String serializer if you want to send a POJO.

Is there some reason you are not relying on the framework to do the conversion for you?

0
votes

You need to post the pom and the application properties file as well. My bet is that the application might have a lines like this :

kafka:
consumer:
  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  bootstrap.servers: localhost:9092
  group.id: fixed-asset-service
  auto.offset.reset: earliest
producer:
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: org.apache.kafka.common.serialization.StringSerializer
  bootstrap.servers: localhost:9092

This is commonly because the blogs and tutorials tend only to show you how to send a string. So immediately you try some custom object it will almost undoubtedly fail. With spring-cloud-stream one does not need to specify the deserializers as is done automatically. Its one of the reasons one would use a framework like spring-cloud-stream And if you do find such a line in the application properties whether you could replace it with the following :

key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

It should work. Better yet remain the lines, and retain the spring.cloud.streams configuration