1
votes

I do not understand very well, as I can change the error channel for all my integration flow. I need to handle exceptions like InvalidAccessTokenException that can be thrown in a subflow inside the router.

What I've tried is to handle exceptions from the default channel "errorChannel" by:

@Bean
public IntegrationFlow errorFlow() {
   return IntegrationFlows.from("errorChannel")
       .handle("errorService", "handleError")
       .get();
}

This error is treated by a method with the following signature:

void handleError(Message<Exception> exception)

But the default behavior persists, that is to say it still shows the trace by console.

So my question is: in java DSL how can I configure an error channel? Is it possible to map a group of exceptions to a particular error channel to make the management service of that group more cohesive?.

The configuration of my integration flow I explain below:

@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {

    private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);

    @Autowired
    private IFacebookService facebookService;

    @Autowired
    private IInstagramService instagramService;

    @Autowired
    private IYoutubeService youtubeService;

    /**
     * The Pollers builder factory can be used to configure common bean definitions or 
     * those created from IntegrationFlowBuilder EIP-methods
     */
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }

    /**
     * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
     * which is the result of execution of a Query
     */
    @Bean
    @Autowired
    public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
        MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
        messageSource.setExpectSingleResult(false);
        messageSource.setEntityClass(UserEntity.class);
        messageSource.setCollectionNameExpression(new LiteralExpression("users"));
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "storeChannel")
    public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
        MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
        adapter.setCollectionNameExpression(new LiteralExpression("comments"));
        return adapter;
    }

    @Bean
    public IntegrationFlow errorFlow() {
        return IntegrationFlows.from("errorChannel")
                .handle("errorService", "handleError")
                .get();
    }


    @Bean
    @Autowired
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
        return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
                .<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
                        -> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
                )
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
                    }
                })
                .channel("directChannel_1")
                .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
                    }
                })
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
                .<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
                        m
                        -> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, 
                                sf -> sf.handle(SocialMediaEntity.class, (p, h) -> facebookService.getComments(p.getAccessToken())))
                            .subFlowMapping(SocialMediaTypeEnum.YOUTUBE, 
                                sf -> sf.handle(SocialMediaEntity.class, (p, h) -> youtubeService.getComments(p.getAccessToken())))
                            .subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, 
                                sf -> sf.handle(SocialMediaEntity.class, (p, h) -> instagramService.getComments(p.getAccessToken())))
                )
                .channel("directChannel_2")
                .aggregate()
                .channel("directChannel_3")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .aggregate()
                .channel("directChannel_4")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .channel("storeChannel")
                .get();
    }


    @PostConstruct
    protected void init(){
        Assert.notNull(facebookService, "The Facebook Service can not be null");
        Assert.notNull(instagramService, "The Instagram Service can not be null");
        Assert.notNull(youtubeService, "The Youtube Service can not be null");
    }

}

An example of an exception that can be launched in any of the social networking services is this:

public class InvalidAccessTokenException extends RuntimeException {

    private SocialMediaTypeEnum socialMediaType;
    private String accessToken;

    public InvalidAccessTokenException(SocialMediaTypeEnum socialMediaType, String accessToken) {
        this.socialMediaType = socialMediaType;
        this.accessToken = accessToken;
    }

    public SocialMediaTypeEnum getSocialMediaType() {
        return socialMediaType;
    }

    public String getAccessToken() {
        return accessToken;
    }
}

Is it possible to bind this exception to a particular error channel?.

Thanks in advance.

I have tried changing the error channel using the PoolSpec as follows:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    return Pollers.fixedDelay(10, TimeUnit.SECONDS)
          .errorChannel("customErrorChannel")
         .get();
}

But messages still continue to go to the default channel 'errorChannel'.

Here is an excerpt of the log messages:

2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-4] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-5] o.s.integration.handler.LoggingHandler   : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008c, accessToken=david_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 4, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20095, correlationId=254f918f-52d2-1cff-7ba5-7343a39a8941, id=3a5989df-2af7-56d0-2697-5fcaf75a2891, timestamp=1501006851527}], headers={id=c270d1ef-68ac-cb6b-245f-11e567b5b7e8, timestamp=1501006851922}]
2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler   : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}], headers={id=30a965d3-f829-d6cf-c971-7b4ed2f15f88, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-4] o.s.integration.handler.LoggingHandler   : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.927 ERROR 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}]
    at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)

I have tried two new approaches to change the default error channel:

  • Create a custom ErrorHandler with the error channel and declare it in the PoolSpec:

    @Bean public MessageChannel customErrorChannel() { return MessageChannels.direct("customErrorChannel").get(); }

    @Bean public ErrorHandler errorHandler() { MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler(); messagePublishingErrorHandler.setDefaultErrorChannel(customErrorChannel()); return messagePublishingErrorHandler; }

    @Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata poller() { return Pollers.fixedDelay(10, TimeUnit.SECONDS) .errorHandler(errorHandler()) .get(); }

The error messages are still going to the default channel 'errorChannel'.

  • Adding the MessageHeaders.ERROR_CHANNEL header to explicitly indicate the error channel:

    .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")) .header(MessageHeaders.ERROR_CHANNEL, "customErrorChannel") )

If this approach works, error messages are directed to the "customErrorChannel".

1

1 Answers

1
votes

Yes, you can do that. There is an ErrorMessageExceptionTypeRouter for similar task. So, you will be able to route your InvalidAccessTokenException to the specific channel.

Also be aware that PollerSpec can be supplied with the errorChannel(), so you don't need to worry that all your exceptions go to the default errorChannel.

UPDATE

OK. After some your code investigation I see this:

.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))

That means you shift your message to different thread and, therefore, any exception there is already far away from the try...catch of the poller's algorithm to send to your custom customErrorChannel.

The ExecutorChannel has this logic:

    if (!(this.executor instanceof ErrorHandlingTaskExecutor)) {
        ErrorHandler errorHandler = new MessagePublishingErrorHandler(
                new BeanFactoryChannelResolver(this.getBeanFactory()));
        this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
    }

Where that MessagePublishingErrorHandler is really based on the errorChannel by default. What you can do here is something like declaration of similar bean for the taskExecutor() bean and injection your customErrorChannel into the MessagePublishingErrorHandler.

Another option which should work here with the MessagePublishingErrorHandler is errorChannel header population upstream the executorChannel definition.