I have a Spring Boot app (app0) that uses Spring Cloud Stream Kafka to read from a topic.
There are two other apps (app1, app2) that produce messages into that topic.
app1 publishes messages using an interface OrderSource:
public interface OrderSource{
String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;
@Output(OrderSource.OUTPUT_PAYMENT)
MessageChannel output();
For instance:
orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);
In this case, app0 reads the messages from app1 without any problem.
app2 publishes its messages using KafkaTemplate:
ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);
In this case I am observing the following exception from EmbeddedHeadersMessageConverter
:
java.lang.StringIndexOutOfBoundsException: String index out of range: 152
at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
Apparently it is trying to extract headers from the payload of the message. How can I prevent this exception from happening while supporting both sources of messages (KafkaTemplate and OrderSource).