I do not understand very well, as I can change the error channel for all my integration flow. I need to handle exceptions like InvalidAccessTokenException that can be thrown in a subflow inside the router.
What I've tried is to handle exceptions from the default channel "errorChannel" by:
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("errorChannel")
.handle("errorService", "handleError")
.get();
}
This error is treated by a method with the following signature:
void handleError(Message<Exception> exception)
But the default behavior persists, that is to say it still shows the trace by console.
So my question is: in java DSL how can I configure an error channel? Is it possible to map a group of exceptions to a particular error channel to make the management service of that group more cohesive?.
The configuration of my integration flow I explain below:
@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {
private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);
@Autowired
private IFacebookService facebookService;
@Autowired
private IInstagramService instagramService;
@Autowired
private IYoutubeService youtubeService;
/**
* The Pollers builder factory can be used to configure common bean definitions or
* those created from IntegrationFlowBuilder EIP-methods
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
/**
* MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload
* which is the result of execution of a Query
*/
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setExpectSingleResult(false);
messageSource.setEntityClass(UserEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("users"));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "storeChannel")
public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
adapter.setCollectionNameExpression(new LiteralExpression("comments"));
return adapter;
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("errorChannel")
.handle("errorService", "handleError")
.get();
}
@Bean
@Autowired
public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
.<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
-> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
)
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
}
})
.channel("directChannel_1")
.enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
}
})
.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
.<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
m
-> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK,
sf -> sf.handle(SocialMediaEntity.class, (p, h) -> facebookService.getComments(p.getAccessToken())))
.subFlowMapping(SocialMediaTypeEnum.YOUTUBE,
sf -> sf.handle(SocialMediaEntity.class, (p, h) -> youtubeService.getComments(p.getAccessToken())))
.subFlowMapping(SocialMediaTypeEnum.INSTAGRAM,
sf -> sf.handle(SocialMediaEntity.class, (p, h) -> instagramService.getComments(p.getAccessToken())))
)
.channel("directChannel_2")
.aggregate()
.channel("directChannel_3")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.aggregate()
.channel("directChannel_4")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.channel("storeChannel")
.get();
}
@PostConstruct
protected void init(){
Assert.notNull(facebookService, "The Facebook Service can not be null");
Assert.notNull(instagramService, "The Instagram Service can not be null");
Assert.notNull(youtubeService, "The Youtube Service can not be null");
}
}
An example of an exception that can be launched in any of the social networking services is this:
public class InvalidAccessTokenException extends RuntimeException {
private SocialMediaTypeEnum socialMediaType;
private String accessToken;
public InvalidAccessTokenException(SocialMediaTypeEnum socialMediaType, String accessToken) {
this.socialMediaType = socialMediaType;
this.accessToken = accessToken;
}
public SocialMediaTypeEnum getSocialMediaType() {
return socialMediaType;
}
public String getAccessToken() {
return accessToken;
}
}
Is it possible to bind this exception to a particular error channel?.
Thanks in advance.
I have tried changing the error channel using the PoolSpec as follows:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS)
.errorChannel("customErrorChannel")
.get();
}
But messages still continue to go to the default channel 'errorChannel'.
Here is an excerpt of the log messages:
2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-4] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-5] o.s.integration.handler.LoggingHandler : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008c, accessToken=david_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 4, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20095, correlationId=254f918f-52d2-1cff-7ba5-7343a39a8941, id=3a5989df-2af7-56d0-2697-5fcaf75a2891, timestamp=1501006851527}], headers={id=c270d1ef-68ac-cb6b-245f-11e567b5b7e8, timestamp=1501006851922}]
2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}], headers={id=30a965d3-f829-d6cf-c971-7b4ed2f15f88, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-4] o.s.integration.handler.LoggingHandler : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.927 ERROR 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}]
at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
I have tried two new approaches to change the default error channel:
Create a custom ErrorHandler with the error channel and declare it in the PoolSpec:
@Bean public MessageChannel customErrorChannel() { return MessageChannels.direct("customErrorChannel").get(); }
@Bean public ErrorHandler errorHandler() { MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler(); messagePublishingErrorHandler.setDefaultErrorChannel(customErrorChannel()); return messagePublishingErrorHandler; }
@Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata poller() { return Pollers.fixedDelay(10, TimeUnit.SECONDS) .errorHandler(errorHandler()) .get(); }
The error messages are still going to the default channel 'errorChannel'.
Adding the MessageHeaders.ERROR_CHANNEL header to explicitly indicate the error channel:
.enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")) .header(MessageHeaders.ERROR_CHANNEL, "customErrorChannel") )
If this approach works, error messages are directed to the "customErrorChannel".