3
votes

According to this topic:
Kafka Spring Integration: Headers not coming for kafka consumer - this is no headers support for Kafka

But documentation says:

spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.

Default: empty.

I can't get it working with spring-cloud-stream-binder-kafka: 1.2.0.RELEASE

SENDING LOG:

MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT : 
GenericMessage [payload=..., 
headers={
   content-type=application/json, 
   correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383, 
   id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f, 
   contentType=application/json, 
   timestamp=1497535771673
}]

RECEIVING LOG:

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1: 
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

I expect to see the same message id and get correlationId on receiving side.

application.properties:

spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders

SENDING MESSAGE:

@Publisher(channel = "testChannel")
public Object newTest(Object param) {
    ...
    return myObject;
}
1
Can you provide an example of what are you trying to do? - Marius Bogoevici
@MariusBogoevici Please, see the post update... - S2201

1 Answers

3
votes

Yes, it does: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties

headerMode

When set to raw, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when inbound data is coming from outside Spring Cloud Stream applications.

Default: embeddedHeaders

But that is already Spring Cloud Stream story, not Spring Kafka per se.