I have a scenario where I need to invoke A and B system's REST call in parallel and aggregate the responses and transform into single FinalResponse
.
To achieve this, I am using Spring Integration splitter and agreggator and the configuration is as given below.
I have exposed a REST endpoint, when the request(request has co-relationId in the header) comes to the controller, we invoke gateway and the splitter sends requests to A and B channels .
Service activator A listens to channel A and invokes A system's REST call and service Activator B listens to B channel and invokes B system's REST call.
Then I need to aggregate the responses from A and B system and then transform it into FinalResponse
. Currently the aggregation and transformation is working fine.
Sometimes when multiple requests come to controller, the FinalResponse
takes more time when compared to single request to controller. All the responses to the requests come almost at the same time not sure why (even though the last request to controller was sent 6-7 secs after the 1st request). Is there something wrong in my configuration related to threads? Not sure why it takes more time to respond when multiple requests comes to the controller.
Also, I am not using any CorrelationStrategy, do we need to use it? Will I face any issues in multi threading environment with the below configuration? Any feedback on the configuration would be helpful
// Controller
{
FinalResponse aggregatedResponse = gateway.collateServiceInformation(inputData);
}
//Configuration
@Autowired
Transformer transformer;
//Gateway
@Bean
public IntegrationFlow gatewayServiceFlow() {
return IntegrationFlows.from("input_channel")
.channel("split_channel").get();
}
//splitter
@Bean
public IntegrationFlow splitAggregatorFlow() {
return IntegrationFlows.from("split_channel").
.split(SomeClass.class, SomeClass::getResources)
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.<Resource, String>route(Resource::getName,
mapping -> mapping.channelMapping("A", "A")
.channelMapping("B", "B"))
.get();
}
//aggregator
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregate_channel").aggregate()
.channel("transform_channel").transform(transformer).get();
}
.
.
.
//Transformer
@Component
@Scope("prototype")
public class Transformer {
@Transformer
public FinalResponse transform(final List<Result> responsesFromAAndB) {
//transformation logic and then return final response
}
}