0
votes

I'm testing an upgrade of my Spring Cloud DataFlow services from Spring Cloud Dalston.SR4/Spring Boot 1.5.9 to Spring Cloud Edgware/Spring Boot 1.5.9. Some of my services extend source (or sink) components from the app starters. I've found this does not work with Spring Cloud Edgware.

For example, I have overridden org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration and bound my app to my overridden version. This has previously worked with Spring Cloud versions going back almost a year.

With Edgware, I get the following (whether the app is run standalone or within dataflow):

***************************
APPLICATION FAILED TO START
***************************

Description:

Field channels in org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration required a bean of type 'org.springframework.cloud.stream.messaging.Source' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.cloud.stream.messaging.Source' in your configuration.

I get the same behaviour with the 1.3.0.RELEASE and 1.2.0.RELEASE of spring-cloud-starter-stream-rabbit.

I override RabbitSourceConfiguration so I can set a header mapper on the AmqpInboundChannelAdapter, and also to perform a connectivity test prior to starting up the container.

My subclass is bound to the Spring Boot application with @EnableBinding(HeaderMapperRabbitSourceConfiguration.class). A cutdown version of my subclass is:

public class HeaderMapperRabbitSourceConfiguration extends RabbitSourceConfiguration {

    public HeaderMapperRabbitSourceConfiguration(final MyHealthCheck healthCheck,
                                                 final MyAppConfig config) {
        // ...
    }

    @Bean
    @Override
    public AmqpInboundChannelAdapter adapter() {
        final AmqpInboundChannelAdapter adapter = super.adapter();
        adapter.setHeaderMapper(new NotificationHeaderMapper(config));

        return adapter;
    }

    @Bean
    @Override
    public SimpleMessageListenerContainer container() {
        if (config.performConnectivityCheckOnStartup()) {

            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Attempting connectivity with ...");
            }
            final Health health = healthCheck.health();
            if (health.getStatus() == Status.DOWN) {
                LOGGER.error("Unable to connect .....");
                throw new UnableToLoginException("Unable to connect ...");
            } else if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Connectivity established with ...");
            }
        }

        return super.container();
    }
}
1
Show your RabbitSourceConfiguration.Gary Russell
I've added the config class to the original question.bikerlad

1 Answers

0
votes

You really should never do stuff like healthCheck.health(); within a @Bean definition. The application context is not yet fully baked or started; it may, or may not, work depending on the order that beans are created.

If you want to prevent the app from starting, add a bean that implements SmartLifecycle, put the bean in a late phase (high value) so it's started after everything else. Then put your code in start(). autStartup must be true.

In this case, it's being run before the stream infrastructure has created the channel.

Some ordering might have changed from the earlier release but, in any case, performing activity like this in a @Bean definition is dangerous.

You just happened to be lucky before.

EDIT

I just noticed your @EnableBinding is wrong; it should be Source.class. I can't see how that would ever have worked - that's what creates the bean for the channels field of type Source.

This works fine for me after updating stream and the binder to 1.3.0.RELEASE...

@Configuration
public class MySource extends RabbitSourceConfiguration {

    @Bean
    @Override
    public AmqpInboundChannelAdapter adapter() {
        AmqpInboundChannelAdapter adapter = super.adapter();
        adapter.setHeaderMapper(new MyMapper());
        return adapter;
    }


}

and

@SpringBootApplication
@EnableBinding(Source.class)
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

If that doesn't work, please edit the question to show your POM.