0
votes

I am trying to set up a binding to forward Kafka messages from the Spring Integration errorChannel to a custom channel (for centralised error processing).

The error messages are being sent to the configured channel, but they are arriving as a GenericMessage with a byte[] payload, which consists of exception details and the failed message.

My config:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          accountOut.producer:
            sync: true
        binder:
          autoCreateTopics: false
          headers:
               - spanId
               - spanTraceId
               - spanSampled
               - spanParentSpanId
               - spanName
               - spanFlags
               - eventType
               - Authorization
      bindings:
        error:
          destination: test-error
        accountOut:
          producer.partitionKeyExpression: payload.key
          content-type: application/json
          destination: account
  kafka:
    producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
    consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer

I'm listening with a @StreamListener(target = "kieran-error") and the consumer configured with @Input("kieran-error") SubscribableChannel

Reading the docs, I was expecting the message to arrive as and ErrorMessage. Is there a way I can acheive this? Or to configure the payload to arrive as an Object?

Versions I'm using:

  • Spring Boot 1.5.8
  • Spring Cloud Edgware
  • Kafka 11 client
  • Spring Integration core 4.3.12

--- Question Update ---

I realise now I could configure Spring Integration to forward to a Kafka Topic by listening to the errorChannel, e.g.

@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    return handler;
}

But is it possible to configure this flow in the properties yaml rather than code? That's where all the other Kafka config is so configuring the kafka template in code is not ideal.

Another option would be to listen for the ErrorMessage explicitly and send to a kafka output channel in the code:

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
    outputChannel.kieranError().send(...)
}
1

1 Answers

1
votes

Exactly where are you consuming such a message?

The message you describe sounds like a message sent to the DLQ topic when enableDlq is true for a consumer; you show no consumer configuration so it's hard for me to guess.

The ErrorMessage sent to the destination-specific error channel (and bridged to the global errorChannel can be consumed with

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
    ...
}

The

error:
  destination:

is legacy and was intended for user code to send messages to the errorChannel that would go to that topic.