0
votes

I have a Spring Boot app (app0) that uses Spring Cloud Stream Kafka to read from a topic.

There are two other apps (app1, app2) that produce messages into that topic.

app1 publishes messages using an interface OrderSource:

public interface OrderSource{ 

    String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;

    @Output(OrderSource.OUTPUT_PAYMENT)
    MessageChannel output();

For instance:

orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);

In this case, app0 reads the messages from app1 without any problem.

app2 publishes its messages using KafkaTemplate:

ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
    SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);

In this case I am observing the following exception from EmbeddedHeadersMessageConverter:

java.lang.StringIndexOutOfBoundsException: String index out of range: 152
    at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
    at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]

Apparently it is trying to extract headers from the payload of the message. How can I prevent this exception from happening while supporting both sources of messages (KafkaTemplate and OrderSource).

1

1 Answers

1
votes

To communicate with non Spring-Cloud-Stream apps, you need to configure the headerMode on the consumer to raw.

You will also need to do the same on the producer for app1 too so he doesn't embed headers.

See consumer properties and producer properties.