0
votes

I have created 3 simple Spring cloud stream apps(Source/Processor/Sink) using Spring cloud function approach transferring Flux.

Source-app:

@SpringBootApplication
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }

    @PollableBean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> {
            String v1 = String.valueOf("abc");
            String v2 = String.valueOf("pqr");
            String v3 = String.valueOf("xyz");
            return Flux.just(v1, v2, v3);
        };
    }
}

Processor-app:

@SpringBootApplication
public class ProcessorApplication {

    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase()).log();
    }

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class, args);
    }
}

Sink-app:

@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

    @Bean
    public Consumer<Flux<String>> log() {
        return flux -> {
            flux.subscribe(f -> System.out.println("Received data: " + f));
        };
    }
}

The dependencies that I have added are:

SpringBoot version = 2.2.6.RELEASE

implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")

I have registered these these apps in Spring Cloud Data Flow and deployed in a stream.

I am able to transmit data to these apps and receive output both via HTTP and via RabbitMQ individually. But, the message is not communicated across the apps(Source->Processor->sink). Am I missing any dependency, annotation or application property.

Currently my application property file is completely empty.

1

1 Answers

0
votes

You need to set spring.cloud.stream.function.bindings.-in-0=input. See https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_programming_model. We’re looking to automate this in a future Dataflow release.