2
votes

We've got a rather complicated spring-integration-amqp use case in one of our production applications and we've been seeing some "org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers" exceptions on startup. After the initial errors on startup, we don't see those exceptions anymore from the same components. This is seeming like some kind of startup race condition on components that depend on AMQP outbound adapters and that end up using them early in the lifecycle.

I can reproduce this by calling a gateway that sends to a channel wired to an outbound adapter in a PostConstruct method.

config:

package gadams;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
@IntegrationComponentScan
public class RabbitRace {

    public static void main(String[] args) {
        SpringApplication.run(RabbitRace.class, args);
    }

    @Bean(name = "HelloOut")
    public MessageChannel channelHelloOut() {
        return MessageChannels.direct().get();
    }

    @Bean
    public Queue queueHello() {
        return new Queue("hello.q");
    }

    @Bean(name = "helloOutFlow")
    public IntegrationFlow flowHelloOutToRabbit(RabbitTemplate rabbitTemplate) {
        return IntegrationFlows.from("HelloOut").handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("hello.q"))
                .get();
    }

}

gateway:

package gadams;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway
public interface HelloGateway {

    @Gateway(requestChannel = "HelloOut")
    void sendMessage(String message);
}

component:

package gadams;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

@Component
@DependsOn("helloOutFlow")
public class HelloPublisher {

    @Autowired
    private HelloGateway helloGateway;

    @PostConstruct
    public void postConstruct() {
        helloGateway.sendMessage("hello");
    }
}

In my production use case, we have a component with a PostConstruct method where we're using a TaskScheduler to schedule a bunch of components with some that depend on AMQP outbound adapters, and some of those end up executing immediately. I've tried putting bean names on the IntegrationFlows that involve an outbound adapter and using @DependsOn on the beans that use the gateways and/or the gateway itself, but that doesn't get rid of the errors on startup.

1
I put my reproduction test case up on github here: github.com/gadams00/rabbit-racegadams00

1 Answers

1
votes

That everything called Lifecycle. Any Spring Integration endpoints start listen for or produce messages only when their start() is performed.

Typically for standard default autoStartup = true it is done in the ApplicationContext.finishRefresh(); as a

// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();

To start producing messages to the channel from the @PostConstruct (afterPropertiesSet()) is really very early, because it is does far away from the finishRefresh().

You really should reconsider your producing logic and that implementation into SmartLifecycle.start() phase.

See more info in the Reference Manual.