I'm testing an upgrade of my Spring Cloud DataFlow services from Spring Cloud Dalston.SR4/Spring Boot 1.5.9 to Spring Cloud Edgware/Spring Boot 1.5.9. Some of my services extend source (or sink) components from the app starters. I've found this does not work with Spring Cloud Edgware.
For example, I have overridden org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration
and bound my app to my overridden version. This has previously worked with Spring Cloud versions going back almost a year.
With Edgware, I get the following (whether the app is run standalone or within dataflow):
***************************
APPLICATION FAILED TO START
***************************
Description:
Field channels in org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration required a bean of type 'org.springframework.cloud.stream.messaging.Source' that could not be found.
Action:
Consider defining a bean of type 'org.springframework.cloud.stream.messaging.Source' in your configuration.
I get the same behaviour with the 1.3.0.RELEASE and 1.2.0.RELEASE of spring-cloud-starter-stream-rabbit.
I override RabbitSourceConfiguration so I can set a header mapper on the AmqpInboundChannelAdapter, and also to perform a connectivity test prior to starting up the container.
My subclass is bound to the Spring Boot application with @EnableBinding(HeaderMapperRabbitSourceConfiguration.class)
. A cutdown version of my subclass is:
public class HeaderMapperRabbitSourceConfiguration extends RabbitSourceConfiguration {
public HeaderMapperRabbitSourceConfiguration(final MyHealthCheck healthCheck,
final MyAppConfig config) {
// ...
}
@Bean
@Override
public AmqpInboundChannelAdapter adapter() {
final AmqpInboundChannelAdapter adapter = super.adapter();
adapter.setHeaderMapper(new NotificationHeaderMapper(config));
return adapter;
}
@Bean
@Override
public SimpleMessageListenerContainer container() {
if (config.performConnectivityCheckOnStartup()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Attempting connectivity with ...");
}
final Health health = healthCheck.health();
if (health.getStatus() == Status.DOWN) {
LOGGER.error("Unable to connect .....");
throw new UnableToLoginException("Unable to connect ...");
} else if (LOGGER.isInfoEnabled()) {
LOGGER.info("Connectivity established with ...");
}
}
return super.container();
}
}
RabbitSourceConfiguration
. – Gary Russell