1
votes

I hv a small POC for custom spring cloud data flow stream using Rabbit MQ where a Custom Source application has a scheduler which sends the current time When i deploy this code and run Source seems not autowired and this.source is showing null and i am getting below error on the below line of code.

 this.source.output().send(MessageBuilder.withPayload(new
 Date().getTime()).build()

ERROR STACK

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[13], headers={contentType=application/json, id=44e58c73-15b5-b250-4db4-7b617bdaa27e, timestamp=1568127722030}], failedMessage=GenericMessage [payload=byte[13], headers={contentType=application/json, id=44e58c73-15b5-b250-4db4-7b617bdaa27e, timestamp=1568127722030}]

SOURCE CODE

@EnableScheduling
@EnableBinding(Source.class)
@SpringBootApplication
public class MSource {

    Logger logger = LoggerFactory.getLogger(MSource.class);

    @Autowired
    private Source source;

    @Bean
    @InboundChannelAdapter(
            value = Source.OUTPUT,
            poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "10000")
    )

    public  void sendMessage() {       
      this.source.output().send(MessageBuilder.withPayload(new Date().getTime()).build());
    }

application yml entry

spring.cloud.stream.bindings.output.destination: pocqueue
spring.cloud.stream.bindings.output.group: poc

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>poc</groupId>
    <artifactId>server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <parent>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>app-starters-build</artifactId>
        <version>2.1.0.RELEASE</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <packaging.type>jar</packaging.type>
        <spring.boot.maven.plugin.version>2.1.2.RELEASE</spring.boot.maven.plugin.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.1.RELEASE</version>
        </dependency>

        <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>app-starters-build</artifactId>
        <version>2.1.1.RELEASE</version>
        <type>pom</type>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <version>3.2.9.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <version>3.2.9.RELEASE</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <!--<build>-->
        <!--<plugins>-->
            <!--<plugin>-->
                <!--<groupId>org.springframework.boot</groupId>-->
                <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
            <!--</plugin>-->
        <!--</plugins>-->
    <!--</build>-->
    <profiles>
        <profile>
            <id>rabbit-binder</id>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
                </dependency>
            </dependencies>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-maven-plugin</artifactId>
                        <configuration>
                            <classifier>rabbit</classifier>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
2

2 Answers

0
votes

Given that you're using @EnableScheduling with the @Autowired Source channel, you can refactor your code to:

@EnableScheduling
@EnableBinding(Source.class)
@SpringBootApplication
public class MSource {

    Logger logger = LoggerFactory.getLogger(EdpiSource.class);

    @Autowired
    private Source source;

    @Scheduled(fixedRate = 1000L) 
    public  void sendMessage() {       
      this.source.output().send(MessageBuilder.withPayload(new Date().getTime()).build());
    }

This should be sufficient to trigger the production of the payload on a 1-second time interval continuously.

0
votes

The issue was in local there was no binding happening as i specified rabbit MQ binding in the configuration and it was expecting a local Rabbit MQ avaialable which was not the case.