0
votes

I currently have a Spring Integration application which is utilizing a number of TCP inbound and outbound adapter combinations for message handling. All of these adapter combinations utilize the same single MessageEndpoint for request processing and the same single MessagingGateway for response sending.

The MessageEndpoint’s final output channel is a DirectChannel that is also the DefaultRequestChannel of the MessageGateway. This DirectChannel utilizes the default RoundRobinLoadBalancingStrategy which is doing a Round Robin search for the correct Outbound Adapter to send the given response through. Of course, this round robin search does not always find the appropriate Outbound Adapter on first search and when it doesn’t it logs accordingly. Not only is this producing a large amount of unwanted logging but it also raises some performance concerns as I anticipate several hundred inbound/outbound adapter combinations existing at any given time.

I am wondering if there is a way in which I can more closely correlate the inbound and outbound adapters in a way that there is no need for the round robin processing and each response can be sent directly to the corresponding outbound adapter? Ideally, I would like this to be implemented in a way that the use of a single MessageEndpoint and single MessageGateway can be maintained.

Note: Please limit solutions to those which use the Inbound/Outbound Adapter combinations. The use of TcpInbound/TcpOutboundGateways is not possible for my implementation as I need to send multiple responses to a single request and, to my knowledge, this can only be done with the use of inbound/outbound adapters.

To add some clarity, below is a condensed version of the current implementation described. I have tried to clear out any unrelated code just to make things easier to read...

// Inbound/Outbound Adapter creation (part of a service that is used to dynamically create varying number of inbound/outbound adapter combinations)
public void configureAdapterCombination(int port) {

    TcpNioServerConnectionFactory connectionFactory = new TcpNioServerConnectionFactory(port);
    // Connection Factory registered with Application Context bean factory (removed for readability)...

    TcpReceivingChannelAdapter inboundAdapter = new TcpReceivingChannelAdapter();
    inboundAdapter.setConnectionFactory(connectionFactory);
    inboundAdapter.setOutputChannel(context.getBean("sendFirstResponse", DirectChannel.class));
    // Inbound Adapter registered with Application Context bean factory (removed for readability)...

    TcpSendingMessageHandler outboundAdapter = new TcpSendingMessageHandler();
    outboundAdapter.setConnectionFactory(connectionFactory);
    // Outbound Adapter registered with Application Context bean factory (removed for readability)...

    context.getBean("outboundResponse", DirectChannel.class).subscribe(outboundAdapter);

}

// Message Endpoint for processing requests
@MessageEndpoint
public class RequestProcessor {

    @Autowired
    private OutboundResponseGateway outboundResponseGateway;

    // Direct Channel which is using Round Robin lookup
    @Bean
    public DirectChannel outboundResponse() {
        return new DirectChannel();
    }

    // Removed additional, unrelated, endpoints for readability...

    @ServiceActivator(inputChannel="sendFirstResponse", outputChannel="sendSecondResponse")
    public Message<String> sendFirstResponse(Message<String> message) {
        // Unrelated message processing/response generation excluded...
        outboundResponseGateway.sendOutboundResponse("First Response", message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class));
        return message;
    }

    // Service Activator that puts second response on the request channel of the Message Gateway
    @ServiceActivator(inputChannel = "sendSecondResponse", outputChannel="outboundResponse")
    public Message<String> processQuery(Message<String> message) {
        // Unrelated message processing/response generation excluded...
        return MessageBuilder.withPayload("Second Response").copyHeaders(message.getHeaders()).build();
    }

}

// Messaging Gateway for sending responses
@MessagingGateway(defaultRequestChannel="outboundResponse")
public interface OutboundResponseGateway {
    public void sendOutboundResponse(@Payload String payload, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}

SOLUTION:

@Artem's suggestions in the comments/answers below seem to do the trick. Just wanted to make a quick note about how I was able to add a replyChannel to each Outbound Adapter on creation.

What I did was create two maps that are being maintained by the application. The first map is populated whenever a new Inbound/Outbound adapter combination is created and it is a mapping of ConnectionFactory name to replyChannel name. The second map is a map of ConnectionId to replyChannel name and this is populated on any new TcpConnectionOpenEvent via an EventListener.

Note that every TcpConnectionOpenEvent will have a ConnectionFactoryName and ConnectionId property defined based on where/how the connection is established.

From there, whenever a new request is received I use theses maps and the 'ip_connectionId' header on the Message to add a replyChannel header to the Message. The first response is sent by manually grabbing the corresponding replyChannel (based on the value of the replyChannel header) from the application's context and sending the response on that channel. The second response is sent via Spring Integration using the replyChannel header on the message as Artem describes in his responses.

This solution was implemented as a quick proof of concept and is just something that worked for my current implementation. Including this to hopefully jumpstart other viewer's own implementations/solutions.

1
Please, share your configuration with us for better understanding what is going on.Artem Bilan
@Artem is there something specific you want to see? I can update the original question to include what you need. Just want to make sure what I add is actually what you need.chimiz13
Show me, please, at least one flow from start to the end. I don’t see the whole picture. Of course, would be great to see the project at all, but I guess it is not possible from your side...Artem Bilan
This architecture sounds very bizarre. I agree that we need some configuration and some explanation of exactly what you are trying to achieve.Gary Russell
I have updated the question to include the configuration.chimiz13

1 Answers

1
votes

Well, I see now your point about round-robin. You create many similar TCP channel adapters against the same channels. In this case it is indeed hard to distinguish one flow from another because you have a little control over those channels and their subscribers.

On of the solution would be grate with Spring Integration Java DSL and its dynamic flows: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-runtime-flows

So, you would concentrate only on the flows and won't worry about runtime registration. But since you are not there and you deal just with plain Java & Annotations configuration, it is much harder for you to achieve a goal. But still...

You may be know that there is something like replyChannel header. It is taken into an account when we don't have a outputChannel configured. This way you would be able to have an isolated channel for each flow and the configuration would be really the same for all the flows.

So,

  • I would create a new channel for each configureAdapterCombination() call.
  • Propagate this one into that method for replyChannel.subscribe(outboundAdapter);
  • Use this channel in the beginning of your particular flow to populate it into a replyChannel header.

This way your processQuery() service-activator should go without an outputChannel. It is going to be selected from the replyChannel header for a proper outbound channel adapter correlation.

You don't need a @MessagingGateway for such a scenario since we don't have a fixed defaultRequestChannel any more. In the sendFirstResponse() service method you just take a replyChannel header and send a newly created message manually. Technically it is exactly the same what you try to do with a mentioned @MessagingGateway.

For Java DSL variant I would go with a filter on the PublishSubscribeChannel to discard those messages which don't belong to the current flow. Anyway it is a different story.

Try to figure out how you can have a reply channel per flow when you configure particular configureAdapterCombination().