3
votes

We are using spring-cloud-stream and planning to upgrade our Kafka version.
Our applications using spring-cloud-stream:2.0.0 (spring-kafka 2.1.7) with apache kafka server 1.0.1 and also using spring-cloud-sleuth:2.0.0 for tracking.
We are going to upgrade our Kafka server to version 2.3.0 so it requires an upgrade to spring-boot 2.2.x (Hoxton) with spring-cloud-sleuth:2.2.0 and spring-cloud-stream:3.0.3 (Horsham.SR3).
We have ~200 applications that using Kafka hence the upgrade will be gradually, so as intermediate state we are going to have producers on the newer version and consumers using old version.
Our consumers are using @StreamListener.

During our tests we encountered an issue with parsing most of the headers with type String and getting the following:

ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...

While the Types header is:

{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}

For instance the X-B3-SpanId that was added by Sleuth is of type String and the value is: ecb89ccb3e79418b which is not JSON string, therefore the ObjectMapper fails on conversion to String Object here:

headers.put(h.key(), getObjectMapper().readValue(h.value(), type))

Looks like it should not use ObjectMapper when we have String types, hence our old consumers are failing.

Is there a way to prevent this issue when using new producer and old consumer?

1

1 Answers

0
votes

You can configure the DefaultKafkaHeaderMapper to be compatible with older versions:

    /**
     * Set to true to encode String-valued headers as JSON ("..."), by default just the
     * raw String value is converted to a byte array using the configured charset. Set to
     * true if a consumer of the outbound record is using Spring for Apache Kafka version
     * less than 2.3
     * @param encodeStrings true to encode (default false).
     * @since 2.3
     */
    public void setEncodeStrings(boolean encodeStrings) {
        this.encodeStrings = encodeStrings;
    }

Also see https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties

spring.cloud.stream.kafka.binder.headerMapperBeanName