I have completed a "happy-path" (as below).
How I can advise a .transform
call to have it invoke an error flow (via errorChannel) w/o interrupting the mainFlow
?
Currently the mainFlow
terminates on first failure occurrence in second .transform
(when payload cannot be deserialized to type). My desired behavior is that I'd like to log and continue processing.
I've read about ExpressionEvaluatingRequestHandlerAdvice
. Would I just add a second param to each .transform
call like e -> e.advice(myAdviceBean)
and declare such a bean with success
and error
channels? Assuming I'd need to break up my mainFlow
to receive success from each transform.
On some commented direction I updated the original code sample. But I'm still having trouble taking this "all the way home".
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]]) at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]]) at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'mainFlow.channel#3', message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler DEBUG org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService ERROR org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing? at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:48) at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing? at org.springframework.util.Assert.state(Assert.java:385) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ... 22 more
UPDATED (09-08-2015)
code sample
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(f -> new FileSplitter())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a ->
a.releaseStrategy(g -> g.size() == persistenceBatchSize)
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("size() ge 2 ? 10000 : -1")
, null
)
.handle(jdbcRepositoryHandler())
// TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
.get();
// @formatter:on
}
@Bean
public ErrorService errorService() {
return new ErrorService();
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow customErrorFlow() {
// @formatter:off
return IntegrationFlows
.from(customErrorChannel())
.handle("errorService", "handleError")
.get();
// @formatter:on
}
@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnFailureExpression("payload");
advice.setFailureChannel(customErrorChannel());
advice.setReturnFailureExpressionResult(true);
advice.setTrapException(true);
return advice;
}
protected class ErrorService implements ErrorHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handleError(Throwable t) {
stopEndpoints(t);
}
private void stopEndpoints(Throwable t) {
log.error(ExceptionUtils.getStackTrace(t));
}
}