I have two applications - the first produces messages using spring-cloud-stream/function with the AWS Kinesis Binder, the second is an application that builds off of spring integration to consume messages. Communicating between the two is not a problem - I can send a message from "stream" and handle it easily in "integration".
When I want to send a custom header, then there is an issue. The header arrives at the consumer as an embedded header using the "New" format (Has an 0xff at the beginning, etc.) - See AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable in spring-cloud-stream.
However, the KinesisMessageDrivenChannelAdapter (spring-integration-aws) does not seem to understand the "new" embedded header form. It uses EmbeddedJsonHeadersMessageMapper (See #toMessage) which cannot "decode" the message. It throws a com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
because of the additional information included in the embedded header (0xff and so on).
I need to send the header across the wire (the header is used to route on the other side), so it's not an option to "turn off" headers on the producer. I don't see a way to use the "old" embedded headers.
I'd like to use spring-cloud-stream/function on the producer side - it's awesome. I wish I could redo the consumer, but...
I could write my own embedded header mapper that understands the new format (use EmbeddedHeaderUtils), and wire it into the KinesisMessageDrivenChannelAdapter.
Given the close relationship between spring-cloud-stream and spring-integration, I must be doing something wrong. Does Spring Integration have an OutboundMessageMapper that understands the new embedded form?
Or is there a way to coerce spring cloud stream to use a different embedding strategy?
I could use Spring Integration on the producer side. (sad face).
Any thoughts? Thanks in advance.