0
votes

I have a scenario where I need to invoke A and B system's REST call in parallel and aggregate the responses and transform into single FinalResponse.

To achieve this, I am using Spring Integration splitter and agreggator and the configuration is as given below.

I have exposed a REST endpoint, when the request(request has co-relationId in the header) comes to the controller, we invoke gateway and the splitter sends requests to A and B channels . Service activator A listens to channel A and invokes A system's REST call and service Activator B listens to B channel and invokes B system's REST call. Then I need to aggregate the responses from A and B system and then transform it into FinalResponse. Currently the aggregation and transformation is working fine.

Sometimes when multiple requests come to controller, the FinalResponse takes more time when compared to single request to controller. All the responses to the requests come almost at the same time not sure why (even though the last request to controller was sent 6-7 secs after the 1st request). Is there something wrong in my configuration related to threads? Not sure why it takes more time to respond when multiple requests comes to the controller. Also, I am not using any CorrelationStrategy, do we need to use it? Will I face any issues in multi threading environment with the below configuration? Any feedback on the configuration would be helpful

// Controller

     {
     FinalResponse aggregatedResponse = gateway.collateServiceInformation(inputData);

     }

     //Configuration 

     @Autowired
     Transformer transformer;

     //Gateway
     @Bean
        public IntegrationFlow gatewayServiceFlow() {
            return IntegrationFlows.from("input_channel")
                    .channel("split_channel").get();
        }

//splitter
     @Bean
     public IntegrationFlow splitAggregatorFlow() {
            return IntegrationFlows.from("split_channel").
                    .split(SomeClass.class, SomeClass::getResources)
                    .channel(c -> c.executor(Executors.newCachedThreadPool()))
                    .<Resource, String>route(Resource::getName,
                            mapping -> mapping.channelMapping("A", "A")
                                    .channelMapping("B", "B"))
                    .get();

        }

//aggregator
        @Bean
        public IntegrationFlow aggregateFlow() {
            return IntegrationFlows.from("aggregate_channel").aggregate()
                    .channel("transform_channel").transform(transformer).get();
        }
        .
        .
        .
    //Transformer   
    @Component
    @Scope("prototype")
    public class Transformer {

     @Transformer
        public FinalResponse transform(final List<Result> responsesFromAAndB) {
        //transformation logic and then return final response
       }
    }
1

1 Answers

0
votes

The splitter provides a default strategy for correlation details in headers. The Aggregator will use them afterwards. What you talk about is called scatter-gather: https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/messaging-routing-chapter.html#scatter-gather. There is a Java DSL equivalent.

I think your problem that some request in the splitted set fails, so am Aggregator can’t finish a group for that request. Nothing obvious so far in your config...