We've got a rather complicated spring-integration-amqp use case in one of our production applications and we've been seeing some "org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers" exceptions on startup. After the initial errors on startup, we don't see those exceptions anymore from the same components. This is seeming like some kind of startup race condition on components that depend on AMQP outbound adapters and that end up using them early in the lifecycle.
I can reproduce this by calling a gateway that sends to a channel wired to an outbound adapter in a PostConstruct method.
config:
package gadams;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;
@SpringBootApplication
@IntegrationComponentScan
public class RabbitRace {
public static void main(String[] args) {
SpringApplication.run(RabbitRace.class, args);
}
@Bean(name = "HelloOut")
public MessageChannel channelHelloOut() {
return MessageChannels.direct().get();
}
@Bean
public Queue queueHello() {
return new Queue("hello.q");
}
@Bean(name = "helloOutFlow")
public IntegrationFlow flowHelloOutToRabbit(RabbitTemplate rabbitTemplate) {
return IntegrationFlows.from("HelloOut").handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("hello.q"))
.get();
}
}
gateway:
package gadams;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface HelloGateway {
@Gateway(requestChannel = "HelloOut")
void sendMessage(String message);
}
component:
package gadams;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
@Component
@DependsOn("helloOutFlow")
public class HelloPublisher {
@Autowired
private HelloGateway helloGateway;
@PostConstruct
public void postConstruct() {
helloGateway.sendMessage("hello");
}
}
In my production use case, we have a component with a PostConstruct method where we're using a TaskScheduler to schedule a bunch of components with some that depend on AMQP outbound adapters, and some of those end up executing immediately. I've tried putting bean names on the IntegrationFlows that involve an outbound adapter and using @DependsOn on the beans that use the gateways and/or the gateway itself, but that doesn't get rid of the errors on startup.