1
votes

I am working with Spring Integration '4.2.5.RELEASE'

I have the following scenario (experiment purposes) for a global flow integration process

  • Receive Data from a JMS Destination and send a reply
  • Do something with the data received
  • Send some data to a JMS Destination and receive a reply

About the two first points I have the following:

<!-- From Remote To Local -->           

<int:channel id="requestFromRemoteToLocalChannel" 
             datatype="com.manuel.jordan.message.Message" />

<int:publish-subscribe-channel id="responseFromRemoteToLocalChannel" 
                               datatype="com.manuel.jordan.message.Message" />

<!-- From Local To Remote -->   

<int:channel id="requestFromLocalToRemoteChannel"                       
             datatype="com.manuel.jordan.message.Message" />

<int:channel id="responseFromLocalToRemoteChannel" 
             datatype="com.manuel.jordan.message.Message" />

<int:channel id="responseFromLocalToRemoteChannelPost" 
             datatype="com.manuel.jordan.message.Message" />

<int-jms:inbound-gateway 
         id="gatewayIn"
         request-destination="queueDestinationFromRemoteToLocal"
         default-reply-destination="queueDestinationFromRemoteToLocalReply"                          
         connection-factory="cachingConnectionFactoryForSpringIntegration"
         request-channel="requestFromRemoteToLocalChannel"
         reply-channel="responseFromRemoteToLocalChannel"
/>

followed with two services activators

<int:service-activator 
     id="serviceActivatorReceptorFromRemoteToLocal"
     input-channel="requestFromRemoteToLocalChannel"
     output-channel="responseFromRemoteToLocalChannel"
     ref="messageServerReceptorJmsGatewayEndpoint"
     method="receptorFromRemoteToLocal"
                            >
</int:service-activator>

<int:service-activator 
     id="serviceActivatorRemoveReplyFromRemoteToLocal"
     input-channel="responseFromRemoteToLocalChannel"
     output-channel="nullChannel"
     ref="messageServerReceptorJmsGatewayEndpoint"
     method="removeReplyFromRemoteToLocal"
                           >
</int:service-activator>

Until here all work fine.

Through a JmsTemplate I send data to the queueDestinationFromRemoteToLocal Destination and therefore the int-jms:inbound-gateway through the request-destination which points to the same Destination is able to automatically get the Message and pass it to a service activator and send a reply to other JMS destination and some @JmsListener does its job.

Again until here I am Ok..

The problem is when from this latest service activator I want send some data to a int-jms:outbound-gateway. It to accomplish the third point of the global integration process

Just changing the latest service activator to:

<int:service-activator 
     id="serviceActivatorRemoveReplyFromRemoteToLocal"
     input-channel="responseFromRemoteToLocalChannel"
     output-channel="requestFromLocalToRemoteChannel"
     ref="messageServerReceptorJmsGatewayEndpoint"
     method="removeReplyFromRemoteToLocal"
                           >            
</int:service-activator>

Practically from output-channel="nullChannel" to output-channel="requestFromLocalToRemoteChannel"

And adding:

<int-jms:outbound-gateway 
         id="gatewayOut"
         request-destination="queueDestinationFromLocalToRemote"                              
         reply-destination="queueDestinationFromLocalToRemoteReply"
         connection-factory="selectedConnectionFactory"
         correlation-key="JMSCorrelationID"
         explicit-qos-enabled="true"                        
         delivery-persistent="true"     
         auto-startup="false"               
         request-channel="requestFromLocalToRemoteChannel"
         reply-channel="responseFromLocalToRemoteChannel"

                              >
    <int-jms:reply-listener />

</int-jms:outbound-gateway> 

<int:service-activator 
     id="serviceActivatorRemoveReplyFromLocalToRemote"
     input-channel="responseFromLocalToRemoteChannel"
     output-channel="nullChannel"
     ref="messageServerReceptorJmsGatewayEndpoint"
     method="removeReplyFromLocalToRemote"
                >

</int:service-activator>

Enabling:

<logger name="org.springframework.integration">
    <level value="debug" />
</logger>

I can see:

31634 [gatewayIn.container-1] DEBUG o.s.i.j.ChannelPublishingJmsMessageListener$GatewayDelegate - failure occurred in gateway sendAndReceive: Dispatcher has no subscribers for channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@51565ec2.requestFromLocalToRemoteChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers

Practically the int-jms:outbound-gateway is not called

What is missing?

Even if I change the requestFromLocalToRemoteChannel from Direct to Queue and add a poller to the int-jms:outbound-gateway nothing happens, the message shown above disappears but the process remains hanged

Note: int-jms:outbound-gateway uses auto-startup="false" because the connection-factory="selectedConnectionFactory" (It is non cached) comes from:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(...);
connectionFactory.setClientID("localConnectionFactory");
connectionFactory.setUserName("Manolito - Local");
connectionFactory.createConnection().start();// By This...
connectionFactory.setExceptionListener(new ConnectionFactoryExceptionListener());
return connectionFactory;

other settings has been used according with: 20.5 Outbound Gateway

If I use:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(...);
connectionFactory.setClientID("localConnectionFactory");
connectionFactory.setUserName("Manolito - Local");
//connectionFactory.createConnection().start();
connectionFactory.setExceptionListener(new ConnectionFactoryExceptionListener());
return connectionFactory;

And auto-startup="true" (it is default anyway) I get:

24917 [gatewayIn.container-1] DEBUG o.s.i.j.ChannelPublishingJmsMessageListener$GatewayDelegate - failure occurred in gateway sendAndReceive: nested exception is javax.jms.InvalidClientIDException: Broker: localhost - Client: localConnectionFactory already connected from tcp://127.0.0.1:59497

Alpha

Adding a new

@Bean
@Conditional(value=ActiveMQConnectionFactoryLocalHostStatusCondition.class)
public ActiveMQConnectionFactory localConnectionFactoryForSpringIntegration(
            @Value("${activemq.localhost.protocol}") String protocol,
            @Value("${activemq.localhost.address}") String host,
            @Value("${activemq.localhost.port}") String port) throws JMSException {

        logger.info("Creating localConnectionFactoryForSpringIntegration...");

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(sameUrl like localConnectionFactory);
        connectionFactory.setClientID("localConnectionFactoryForSpringIntegration");
        connectionFactory.setUserName("Manolito - Local ForSpringIntegration");
        //connectionFactory.createConnection().start();
        connectionFactory.setExceptionListener(new ConnectionFactoryExceptionListener());
        return connectionFactory;
    }

Here, setClientID and setUserName are news and this localConnectionFactoryForSpringIntegration is used only for below: connection-factory="localConnectionFactoryForSpringIntegration"

And changing

<int-jms:outbound-gateway 
         id="gatewayOut"
         request-destination="queueDestinationFromLocalToRemote"                              
         reply-destination="queueDestinationFromLocalToRemoteReply"
         connection-factory="localConnectionFactoryForSpringIntegration"
         correlation-key="JMSCorrelationID"
         explicit-qos-enabled="true"                        
         delivery-persistent="true"     
         auto-startup="true"                

         request-channel="requestFromLocalToRemoteChannel"
         reply-channel="responseFromLocalToRemoteChannel"

                              >
    <int-jms:reply-listener />

</int-jms:outbound-gateway> 

I get again:

[gatewayIn.container-1] DEBUG o.s.i.j.ChannelPublishingJmsMessageListener$GatewayDelegate - failure occurred in gateway sendAndReceive: nested exception is javax.jms.InvalidClientIDException: Broker: localhost - Client: localConnectionFactoryForSpringIntegration already connected from tcp://127.0.0.1:50056

Until here:

  • I have localConnectionFactory and localConnectionFactoryForSpringIntegration pointing to the same BrokerURL
  • int-jms:outbound-gateway needs a non-cached connection-factory
  • localConnectionFactoryForSpringIntegration has commented connectionFactory.createConnection().start()
  • int-jms:outbound-gateway has auto-startup="true"

Seems in some point localConnectionFactoryForSpringIntegration has been started and the int-jms:outbound-gateway throws:

localhost - Client: localConnectionFactoryForSpringIntegration already connected from tcp://127.0.0.1:50056

Seems something is needed it

1

1 Answers

0
votes

Consuming endpoints don't subscribe to a direct channel (or poll from a queue channel) until they are started. Hence dispatcher has no subscribers when the gateway is in a stopped state (or nothing polled from the queue).

InvalidClientIDException

You can't create multiple connections with the same client id; if you use different connection factories, they need unique client ids.