0
votes

We have a source like follows and we are using spring cloud stream rabbit binder 3.0.1.RELEASE.

@Component
public class Handlers {

  private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

  public void emitData(String str){
    sourceGenerator.onNext(str);
  }

  @Bean
  public Supplier<Flux<String>> generate() {
    return () -> sourceGenerator;
  }

  @Bean
  public Function<String, String> process() {
    return str -> str.toUpperCase();
  }


}

application.yml

spring:
  profiles: dev
  cloud:
    stream:
      function:
        definition: generate;process
        bindings:
          generate-out-0: source1
          process-in-0: source1
          process-out-0: processed

        bindingServiceProperties:
          defaultBinder: local_rabbit

      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: / 

While calling emitData method, we are not seeing data in RabbitMQ queue. We also observed that consumer binding is working. That we checked by means of direct sending of messages into a consumer linked queue through RabbitMQ Admin. But supplier binding is not working.

Also, we observed that Supplier without Flux is working fine with the same application.yml configuration. Are we missing any configuration here?

Even test case with TestChannelBinderConfiguration is working fine as follows.

@Slf4j
@TestPropertySource(
        properties = {"spring.cloud.function.definition = generate|process"}
)
public class HandlersTest extends AbstractTest {
  @Autowired
  private OutputDestination outputDestination;

  @Test
  public void testGeneratorAndProcessor() {
      final String testStr = "test"; 
      handlers.emitData(testStr);

      Object eventObj;
      final Message<byte[]> message = outputDestination.receive(1000);

      assertNotNull(message, "processing timeout");
      eventObj = message.getPayload();

      assertEquals(new String((byte[]) eventObj), testStr.toUpperCase());
  }
}
3
How do you invoke emitData? To be specific, how do you get reference to Handlers?Oleg Zhurakousky
Handlers are being used like normal autowired bean from other class to call `emitdata' method.Mayank Rupareliya

3 Answers

0
votes

When you say we are not seeing data in RabbitMQ queue. . .. Which queue are you talking about? When using AMQP, messages are sent to exchanges and if such exchange is not bound to any queue the message is dropped, hence my question. Did you actually bind generate-out-0 exchange to a queue?

In any event, I just tested it and all is working as expected. Here is the complete code.

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = SpringApplication.run(SimpleStreamApplication.class);
        SimpleStreamApplication app = context.getBean(SimpleStreamApplication.class);
        app.emitData("Hello");
    }

    private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

    public void emitData(String str) {
        sourceGenerator.onNext(str);
    }

    @Bean
    public Supplier<Flux<String>> generate() {
        return () -> sourceGenerator;
    }
}
0
votes

While I appreciate you posting a project unfortunately your story continues to change and I am still not sure what is it that you want to accomplish. So this is my last response yet I'll try to be as detailed and as informative as I can, so here is what I see from your project.

  1. Your configuration is faulty. The definition property for functions should spring.cloud.function.definition

. . .

spring:
  cloud:
    function:
       definition: generate;process;sink

. . .

  1. Since you are using ; I am assuming you want all 3 functions to be bound independently (no function composition) as described in multiple binding section.

  2. The spring.cloud.stream.function.bindings is a property that allows you to map generated binding name to a custom binding name as described in Function Binding Names. It has nothing to do with the names of the actual destinations. For that we have destination property which is also covered in the referenced section (e.g., --spring.cloud.stream.bindings.generate-out-0.destination=source1). However if the destination property is not used the binding name and the destination name is assumed to be the same. However, consumer destination also requires group name and if not provided it generates one. So, based on your configuration your generate-out-0 supplier is bound to source1 exchange:

enter image description here

The input of process-in-in function on the other hand is bound to source1.anonymous... queue:

enter image description here

And as I stated earlier there is no RabbitMQ binding between source1 exchange and source1.anonymous... queue, therefore messages that are sent to source1 exchange are simply dropped. By creating such binding (e.g., via Rabbit MQ console) the messages would reach the consumer.

That said, such design is very inefficient. Why do you want to send to and receive from the same destination while in the same process space (JVM)? Why abuse the network when you can simply pass by reference? So at the very least changing definition to spring.cloud.function.definition=generate|process|sink`. A better solution would simply be write your code in the supplier itself

public void emitData(String str) {
    String uppercased = str.toUpperCase();
    sourceGenerator.onNext(uppercased);
    System.out.println("Emitted: " + str);
}

and be done with it. Anyway, I would strongly suggest for you to go over our user guide specifically the Main Concepts section and Programming Model section as I believe you have misunderstood certain core concepts, which i believe contribute to the inconsistencies in both your post and your questions.

0
votes

We did some changes in the code. But the issue is still here. Flux implementation of the supplier is not working. Non flux supplier is working fine:


    @Bean
    public Supplier<Flux<String>> generate_flux() {
        return () -> sourceGenerator;
    }

    @Bean
    public Supplier<Message<?>> generate_non_flux() {
        return MessageBuilder
           .withPayload("Non flux emitter: " + LocalDateTime.now().toString())::build;
    }

Full source is in the same place

Also we changed application.yml as you suggested, and we did some experiments. Thank you for the explanation about the meaning of topics. But we also checked and can say that RabbitMQ automatically links outputs and consumers with the same destination and any specified group names. It works both for explicitly specified groups and random generated ones. This is not about a parallel processing, this is about an ability of RabbitMQ to link it.

Both generate_flux and generate_non_flux connected to the same output destination:

      bindings:
        generate_flux-out-0:
          destination: source
        generate_non_flux-out-0:
          destination: source

Now the output of the application is:

Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:51.721094
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:52.725961
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:53.727054
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:54.727898
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801

There are processed messages with NON FLUX but there are no flux ones.

So, non flux emitter works fine but we cannot use it to emit by request. Flux implementation for the supplier doesn't work. From that we started and we didn't do any changes in the description of the task.

Speaking about our splitting of the code to supplier, processor and sink, we are talking about different types of machines. supplier - it is a legacy code which is generating data. processor is a memory consuming part of the workflow and we want to keep it on a separate set of VMs with ability to scale it in Kubernetes. sink in our case is a specific machine which is storing data into a DB. At the same time, due to legacy code, we want to have common code of the application in general and don't split it into a separate applications like Apache Beam based ones.