0
votes

My Integration Flow code is:

@Bean
    public IntegrationFlow messageFlow() {
        return IntegrationFlows.from(stompInboundChannelAdapter())
                .transform(inBoundStompMsgTransformer::transform)
                .headerFilter("stomp_subscription","content-length")
                .handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
                .get();
    }

I am using Spring Boot.

The log clear states that {transformer} subscriber has been added to the input channel

2019-12-09 18:21:41.752  INFO 18248 --- [           main] o.s.i.s.i.StompInboundChannelAdapter     : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'

However, i get an exception and i lose the first one/two messages that are in the queue. It processes remaining messages.

Assume that there are 10 messages in the queue before i start the application. After i start the application, i get an exception even though the log says that the subscriber has been added and bean has been started, post exception, 8/9 messages are processed.

Exception is: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage

It is clear that the context is not fully ready to process the messages and hence the exception. But the log messages are misleading.

My first question:

  1. So what exactly it means when subscriber has been added and bean is started? Does it mean that everything has been set but still context has to be ready to process the messages?

To overcome this, as suggested in many posts i used control bus to start the adapter. The code for that is:

......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {

    @Autowired
    private MessageChannel controlBusChannel;

    @Override
    public void start() {
        System.out.println("Service starting...");
        controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
    }
.....

I created public class ApplicationLifeCycle implements SmartLifecycle thinking it will be handy.

My second question is:

  1. Is this the right/best way of using control bus to handle start/stop of adapters? If it is not the right way, then please let me know the right approach.

Thanks,

Mahesh

1

1 Answers

1
votes

I assume it is continuation of your other question: IntegrationFlow Amqp Channel Adapter is not working in handle()

And there you have this:

@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
    StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
    adapter.setOutputChannel(stompInputChannel());
    adapter.setPayloadType(ByteString.class);
    return adapter;
}

which you don't show here.

The problem is that that you then use the same definition in the IntegrationFlow. It turn out that StompInboundChannelAdapter bean is started earlier, then IntegationFlow is processed and that .transform(inBoundStompMsgTransformer::transform) is subscribed for handling incoming messages.

So, if you remove that @Bean from the stompInboundChannelAdapter() it should work for you properly. I'll take a look later why MessageProducerSupport is started earlier, then IntegrationFlows...