I am trying to set up a binding to forward Kafka messages from the Spring Integration errorChannel to a custom channel (for centralised error processing).
The error messages are being sent to the configured channel, but they are arriving as a GenericMessage
with a byte[] payload, which consists of exception details and the failed message.
My config:
spring:
cloud:
stream:
kafka:
bindings:
accountOut.producer:
sync: true
binder:
autoCreateTopics: false
headers:
- spanId
- spanTraceId
- spanSampled
- spanParentSpanId
- spanName
- spanFlags
- eventType
- Authorization
bindings:
error:
destination: test-error
accountOut:
producer.partitionKeyExpression: payload.key
content-type: application/json
destination: account
kafka:
producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
I'm listening with a @StreamListener(target = "kieran-error")
and the consumer configured with @Input("kieran-error") SubscribableChannel
Reading the docs, I was expecting the message to arrive as and ErrorMessage
. Is there a way I can acheive this? Or to configure the payload to arrive as an Object?
Versions I'm using:
- Spring Boot 1.5.8
- Spring Cloud Edgware
- Kafka 11 client
- Spring Integration core 4.3.12
--- Question Update ---
I realise now I could configure Spring Integration to forward to a Kafka Topic by listening to the errorChannel
, e.g.
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
return handler;
}
But is it possible to configure this flow in the properties yaml rather than code? That's where all the other Kafka config is so configuring the kafka template in code is not ideal.
Another option would be to listen for the ErrorMessage explicitly and send to a kafka output channel in the code:
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
outputChannel.kieranError().send(...)
}