My Integration Flow code is:
@Bean
public IntegrationFlow messageFlow() {
return IntegrationFlows.from(stompInboundChannelAdapter())
.transform(inBoundStompMsgTransformer::transform)
.headerFilter("stomp_subscription","content-length")
.handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
.get();
}
I am using Spring Boot.
The log clear states that {transformer}
subscriber has been added to the input channel
2019-12-09 18:21:41.752 INFO 18248 --- [ main] o.s.i.s.i.StompInboundChannelAdapter : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
However, i get an exception and i lose the first one/two messages that are in the queue. It processes remaining messages.
Assume that there are 10 messages in the queue before i start the application. After i start the application, i get an exception even though the log says that the subscriber has been added and bean has been started, post exception, 8/9 messages are processed.
Exception is: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage
It is clear that the context is not fully ready to process the messages and hence the exception. But the log messages are misleading.
My first question:
- So what exactly it means when subscriber has been added and bean is started? Does it mean that everything has been set but still context has to be ready to process the messages?
To overcome this, as suggested in many posts i used control bus to start the adapter. The code for that is:
......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {
@Autowired
private MessageChannel controlBusChannel;
@Override
public void start() {
System.out.println("Service starting...");
controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
}
.....
I created public class ApplicationLifeCycle implements SmartLifecycle
thinking it will be handy.
My second question is:
- Is this the right/best way of using control bus to handle start/stop of adapters? If it is not the right way, then please let me know the right approach.
Thanks,
Mahesh