0
votes

Hi all and a specially spring team!

How can I pipeline spring-cloud-function with spring-cloud-stream in functional Bean programming model style?

For example I have pom.xml with both dependencies:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>

and let's say I would like to do next:

  1. send via http payload string via spring-cloud-function (webflux)
  2. uppercase it using my toUpperCase function
  3. and finally send forward into my pipeline to installed binder (kafka/rabbit/test-binder)

so I'm expecting implement it like so:

@Log4j2
@SpringBootApplication
public class SpringCloudFunctionStreamApplication {

  /**
   * can I sent result of that function to my broker without any
   * explicitly defined output.send(...) execution?
   */
  @Bean
  public Function<String, String> toUpperCase() {
    return arg -> {
      var res = arg.toUpperCase();
      log.info("toUpperCase: {}", res);
      return res;
    };
  }

  public static void main(String[] args) {
    SpringApplication.run(
      SpringCloudFunctionStreamApplication.class,
      "--spring.cloud.function.definition=toUpperCase",
      "--spring.cloud.stream.function.definition=toUpperCase"
    );
  }
}

so when I'm using HTTPie to send a payload, like so:

echo 'hello' | http :8080/toUpperCase

spring-cloud-function seems to works fine, and I can see expected log:

2019-06-09 21:20:36.978 ...SpringCloudFunctionStreamApplication : toUpperCase: hello

same think if I'm publishing message via rabbitmq management web ui, but how I can pipeline from one to another

So my question related to according to spring documentation which says that I can use spring-cloud-stream as well: Wrappers for @Beans of type Function, Consumer and Supplier, exposing them to the outside world as either HTTP endpoints and/or message stream listeners/publishers with RabbitMQ, Kafka etc., but I cannot understand how?

At the moment, unfortunately, I can only manually publish message to spring-cloud-stream binder using Source see example here, but it's of course what I want to know if it's possible to avoid with spring, magically...

Can please anybody tell me (maybe Gary Russell, Dave Sawyer, Artem Bilan, Oleg Zhurakousky or anybody else who knows): what do I missed and how I should configure my app or which props I should add in my application.properties, etc?

Thanks!


Regards, Maksim

3
Maksim, if I understand correctly you want http -> function(s) -> rabbit correct? - Oleg Zhurakousky
Hello Oleg! Yes, I just want provide functions definitions for both: spring-cloud-function and spring-cloud-stream and somethere configure data flow pipeline... So wherever spring-cloud-function will be triggered by someone via rest, it's output should pipeline forward next to spring-cloud-stream, according to configuration by using rabbit/karfka. At the moment I was able forward data from function into queue only manually, but it would be awesome if I should not inject Source and use it to manually to pass data into queue - Maksim Kostromin

3 Answers

1
votes

Maksim

This is not ideal but given that we implemented initial function support within the scope of the existing binders there are some limitations. I'll explain, but first here is the fully functioning code:

@SpringBootApplication
public class SimpleFunctionRabbitDemoApplication  {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleFunctionRabbitDemoApplication.class,
            "--spring.cloud.stream.function.definition=uppercase");
    }

    @Autowired
    private Processor processor;

    @Bean
    public Consumer<String> consume() {
        return v -> processor.input().send(MessageBuilder.withPayload(v).build());
    }

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

Basically there is a bit of a missmatch. On stream side we have binders and on he function side we have adapters. You are effectively (with your requirement) attempting to bridge the two into a pipeline. So. . .

Let's look at the binders first.

The uppercase function is bound to input and output channels provided by the message channel binders (rabbit or kafka), effectively creating an internal pipeline input -> uppercase -> output. It is also exposed as REST endpoint by s-c-function, however, s-c-function does not have access to the mentioned pipeline. In fact it actually has it's own pipeline request -> uppercase -> reply. So what we need to do is to bridge the two concepts together and that is effectively what I did.

  • You inject your app with Processor binding which contains references to the channels uppercase is bound to.

  • You invoke consume() via REST http://localhost:8080/consume/blah.

  • You send a Message to the input channel of the uppercase function

In he future to simplify this we simply need to create a binder-like version of web adapter, so please feel free to raise a feature request. But as you can see the current workaround is not all that involved.

0
votes

This is rather a question to Oleg Zhurakousky. Would be glad if answered

If I use @Bean Supplier<Pojo>... for binding the output destination, how to invoke it from a @Service class or @Controller class every time a new Pojo is to be sent to Kafka/Rabbit.

Supplier only exposes a get() method.

I am writing only the producer which will write a custom Pojo to Kafka and a different application is the Consumer. The functional approach is clearer for a Consumer<Pojo>... where it would just read from the Kafka and process. The Supplier<Pojo>... part for the producer is not clear.

https://www.youtube.com/watch?v=nui3hXzcbK0&t=3478s

0
votes

@Abhishek

You can utilise EmitterProcessor as described here. The provided example uses rest endpoint as he actual source of data, but as you can see it doesn't meter since all you need to do is invoke onNext operation of the EmitterProcessor passing your event from Service.