I currently have a Spring Integration application which is utilizing a number of TCP inbound and outbound adapter combinations for message handling. All of these adapter combinations utilize the same single MessageEndpoint for request processing and the same single MessagingGateway
for response sending.
The MessageEndpoint’s final output channel is a DirectChannel
that is also the DefaultRequestChannel of the MessageGateway. This DirectChannel
utilizes the default RoundRobinLoadBalancingStrategy
which is doing a Round Robin search for the correct Outbound Adapter to send the given response through. Of course, this round robin search does not always find the appropriate Outbound Adapter on first search and when it doesn’t it logs accordingly. Not only is this producing a large amount of unwanted logging but it also raises some performance concerns as I anticipate several hundred inbound/outbound adapter combinations existing at any given time.
I am wondering if there is a way in which I can more closely correlate the inbound and outbound adapters in a way that there is no need for the round robin processing and each response can be sent directly to the corresponding outbound adapter? Ideally, I would like this to be implemented in a way that the use of a single MessageEndpoint
and single MessageGateway
can be maintained.
Note: Please limit solutions to those which use the Inbound/Outbound Adapter combinations. The use of TcpInbound/TcpOutboundGateways is not possible for my implementation as I need to send multiple responses to a single request and, to my knowledge, this can only be done with the use of inbound/outbound adapters.
To add some clarity, below is a condensed version of the current implementation described. I have tried to clear out any unrelated code just to make things easier to read...
// Inbound/Outbound Adapter creation (part of a service that is used to dynamically create varying number of inbound/outbound adapter combinations)
public void configureAdapterCombination(int port) {
TcpNioServerConnectionFactory connectionFactory = new TcpNioServerConnectionFactory(port);
// Connection Factory registered with Application Context bean factory (removed for readability)...
TcpReceivingChannelAdapter inboundAdapter = new TcpReceivingChannelAdapter();
inboundAdapter.setConnectionFactory(connectionFactory);
inboundAdapter.setOutputChannel(context.getBean("sendFirstResponse", DirectChannel.class));
// Inbound Adapter registered with Application Context bean factory (removed for readability)...
TcpSendingMessageHandler outboundAdapter = new TcpSendingMessageHandler();
outboundAdapter.setConnectionFactory(connectionFactory);
// Outbound Adapter registered with Application Context bean factory (removed for readability)...
context.getBean("outboundResponse", DirectChannel.class).subscribe(outboundAdapter);
}
// Message Endpoint for processing requests
@MessageEndpoint
public class RequestProcessor {
@Autowired
private OutboundResponseGateway outboundResponseGateway;
// Direct Channel which is using Round Robin lookup
@Bean
public DirectChannel outboundResponse() {
return new DirectChannel();
}
// Removed additional, unrelated, endpoints for readability...
@ServiceActivator(inputChannel="sendFirstResponse", outputChannel="sendSecondResponse")
public Message<String> sendFirstResponse(Message<String> message) {
// Unrelated message processing/response generation excluded...
outboundResponseGateway.sendOutboundResponse("First Response", message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class));
return message;
}
// Service Activator that puts second response on the request channel of the Message Gateway
@ServiceActivator(inputChannel = "sendSecondResponse", outputChannel="outboundResponse")
public Message<String> processQuery(Message<String> message) {
// Unrelated message processing/response generation excluded...
return MessageBuilder.withPayload("Second Response").copyHeaders(message.getHeaders()).build();
}
}
// Messaging Gateway for sending responses
@MessagingGateway(defaultRequestChannel="outboundResponse")
public interface OutboundResponseGateway {
public void sendOutboundResponse(@Payload String payload, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}
SOLUTION:
@Artem's suggestions in the comments/answers below seem to do the trick. Just wanted to make a quick note about how I was able to add a replyChannel
to each Outbound Adapter on creation.
What I did was create two maps that are being maintained by the application. The first map is populated whenever a new Inbound/Outbound adapter combination is created and it is a mapping of ConnectionFactory
name to replyChannel
name. The second map is a map of ConnectionId
to replyChannel
name and this is populated on any new TcpConnectionOpenEvent
via an EventListener
.
Note that every TcpConnectionOpenEvent
will have a ConnectionFactoryName
and ConnectionId
property defined based on where/how the connection is established.
From there, whenever a new request is received I use theses maps and the 'ip_connectionId' header on the Message
to add a replyChannel
header to the Message. The first response is sent by manually grabbing the corresponding replyChannel
(based on the value of the replyChannel
header) from the application's context and sending the response on that channel. The second response is sent via Spring Integration using the replyChannel
header on the message as Artem describes in his responses.
This solution was implemented as a quick proof of concept and is just something that worked for my current implementation. Including this to hopefully jumpstart other viewer's own implementations/solutions.