0
votes

When I send a record to kafka topic consumer recieves "nativeHeaders" with some unnecessary header (which HeaderMethodArgumentResolver can not even cast to Map).

I'm looking for some way to override HeaderMethodArgumentResolver method "getNativeHeaders" to exclude this garbage header and don't know how to provide this subclass to the spring.

There's an original method from org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver :

private Map<String, List<String>> getNativeHeaders(Message<?> message) {
   return (Map)message.getHeaders().get("nativeHeaders");
  }

Where this call:

message.getHeaders().get("nativeHeaders");

returns this: https://ibb.co/qrvMNMk (as you see there's extra field "headerValue" apart from key-value, which prevents casting)

Send record by kafkaTemplate like this:

kafkaTemplate.send(new ProducerRecord<String, TempContractEntity>(topics.getSubmit(), tempContractEntity));

Consumer gets messages by @KafkaListener annotation:

@KafkaListener(topics = "#{settingsService.getTopics()}")
public void processMessage(OrchestratorRequestImpl orchestratorRequest,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName) throws Throwable{//...}

Generally I want to find a way to pre-process kafka headers

1

1 Answers

0
votes

The NonTrustedHeaderType indicates that something sent a message with that header and it's class is not trusted. This would not happen with the type of send you show - there is no Message<?> involved there, so something is missing from the picture in your question.

One thing you could do is add a ConsumerInterceptor to the consumer configuration and weed out the unwanted header in the onConsume() method.

But you should really figure out who's sending it.