I have a REST controller which calls a gateway annotated with @MessagingGateway(errorChannel = ERROR_CHANNEL)
This way, whatever error occurs downstream the integration flow initiated by the gateway will flow into an error channel which will be handled by another integration flow, this is working as expected.
Now, there is another scenario where an integration flow reads messages from Kafka, routes those messages to another channel, one more integration flow processes those messages and another flow sends an HTTP request to a remote service.
public IntegrationFlowBuilder attachmentEventTenantRouter(String tenantId) {
return attachmentEventBaseFlow(".*")
.filter(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY) != null && m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY, String.class).equalsIgnoreCase(tenantId));
}
private IntegrationFlowBuilder attachmentEventBaseFlow(String eventRegex) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory.createContainer(topic)).errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME))
.log(LoggingHandler.Level.DEBUG, "Inside Kafka Consumer")
.filter(Message.class, m -> filter(m, eventRegex))
.transform(messageToEventTransformer);
}
@Bean
public IntegrationFlow kafkaConsumerFlow() {
return fromKafkaFlowHelper.attachmentEventTenantRouter(TENANT_ID)
.route(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.EVENT_TYPE_KEY, String.class), p -> p
.resolutionRequired(false)
.channelMapping("eventType", "transformMessagesFromKafkaAndPublishAnotherEvent")
.defaultOutputChannel("nullChannel"))
.get();
}
@Bean
public IntegrationFlow transformMessagesFromKafkaAndPublishAnotherEvent() {
return flow -> flow
.transform(transformer)
.handle( getKafkaHandler() );
}
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(gatewayCall, e -> e.advice(expressionAdvice()));
}
How can I do to handle the exceptions that might occur in the flows above?
As you can see, I am using a ExpressionEvaluatingRequestHandlerAdvice which does the work for the handle method, but not sure how to handle exceptions that might occur in the transformers?
The massage gateway with an error channel configured does the trick when the gateway is called by a rest controller, but when the flow is initiated by the Kafka consumer, I'm lost how to achieve this.
Thanks.
EDITED AFTER Artem's RESPONSE TO ADD CLARIFICATION:
This is the configuration of the integration flow that posts a request to a remote service and whose exceptions does not seem to be caught and routed to the errorChannel without a ExpressionEvaluatingRequestHandlerAdvice:
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(getOAuth2Handler(HttpMethod.PUT, "remote url"), e -> e.advice(expressionEvaluatingRequestHandlerAdvice));
}
private OAuth2RequestHandler getOAuth2Handler(HttpMethod httpMethod, String url) {
return new OAuth2RequestHandler(oAuth2RestTemplate, httpMethod, url);
}
And class OAuth2RequestHandler which implements a MessageHandler
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
String requestBody = (String) message.getPayload();
ResponseEntity<String> response = oAuth2RestTemplate.exchange(url, httpMethod, new HttpEntity<>(requestBody), String.class);
}