I new to both Spring Cloud Stream and Kafka. I am receiving the following error from the kafka producer when sending a string in the payload. Any help or insights is greatly appreciated. I tried using bytearray serializer/deserializer as well as json as oppposed to plain text.
error message: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
error stack:
2019-10-25 16:13:40.762 ERROR 4628 --- [ XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits : Internal Server Error
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
....
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
Listed here is Spring Cloud Stream settings
cloud:
stream:
bindings:
greetings-in:
destination: greetings
#content-type: application/json
content-type: text/plain
greetings-out:
destination: greetings
#content-type: application/json
content-type: text/plain
Producer settings
2019-10-25 16:12:28.346 INFO 4628 --- [ restartedMain] com.ll.kafkaservice.KafkaServiceApp : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [ restartedMain] c.l.k.aop.logging.LoggingAspect : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377 INFO 4628 --- [ restartedMain] c.l.k.service.KafkaServiceKafkaProducer : Kafka producer initializing...
2019-10-25 16:12:28.378 INFO 4628 --- [ restartedMain] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2019-10-25 16:12:28.399 INFO 4628 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
Listed here is the object containing the message
package com.ll.kafkaservice.messaging;
import java.io.Serializable;
public class Greeting implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public Greeting() {
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String toString() {
StringBuffer sbuffer = new StringBuffer();
sbuffer.append("{");
sbuffer.append("message:");
sbuffer.append(message);
sbuffer.append("}");
return sbuffer.toString();
}
}
Define the streams
package com.ll.kafkaservice.greeting;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface GreetingsStreams {
String INPUT = "greetings-in";
String OUTPUT = "greetings-out";
@Input(INPUT)
SubscribableChannel inboundGreetings();
@Output(OUTPUT)
MessageChannel outboundGreetings();
}
Bind the streams
package com.ll.kafkaservice.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;
@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {
}
Produce/send message
package com.ll.kafkaservice.greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
import com.ll.kafkaservice.messaging.Greeting;
@Service
public class GreetingsService {
private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
private final GreetingsStreams greetingsStreams;
private MessageChannel messageChannel;
public GreetingsService(GreetingsStreams greetingsStreams) {
this.greetingsStreams = greetingsStreams;
}
public void sendGreeting(final Greeting greeting) {
messageChannel = greetingsStreams.outboundGreetings();
log.info("Before send {}", greeting.toString());
messageChannel.send(MessageBuilder
// Sends a string to payload not the object
.withPayload(greeting.getMessage())
// Note: tried this with and without the header
//.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
}
}
Consume/Receive message
package com.ll.kafkaservice.greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.ll.kafkaservice.messaging.Greeting;
@Component
public class GreetingsListener {
private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);
@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greeting greetings) {
log.info("Received greetings: {}", greetings.getMessage());
}
//@StreamListener(GreetingsStreams.INPUT)
//public void handleGreetings(String greetings) {
// log.info("Received greetings: {}", greetings);
//}
}
[B cannot be cast to java.lang.String
error can also occur if the POJO does not define a public, no-argument constructor. That's not the case in this question, but FYI for future readers. See here. – Tomboyo