0
votes

enter image description here

I am trying to achieve the above scenario using spring cloud stream supplier and consumer.

  1. This app is a single spring boot app containing producer and consumer.
  2. There is one producer and (can be) multiple consumers. All consumers should behave as a client to queue (i.e. single message should be received by a single consumer only) and other consumers receive different messages.

Below is the java class

 @Component
public class MultipleFunctionsApplication {
    
    @Bean
    public Consumer<String> sink1() {
        return message -> {         
            System.out.println(new Date() + "----------->>> sink1 - Received message " + message);
        };
    }

    @Bean
    public Consumer<String> sink2() {
        return message -> {         
            System.out.println(new Date() + "----------->>> sink2 - Received message " + message);
        };
    }

}

I am trying to use the consumer group feature to achieve this as below.

spring:
  cloud:
    stream:
      bindings:
        requester1:
          destination: rss-exchange
          group: requester
        requester2:
          destination: rss-exchange
          group: requester
      function:
        bindings:
          sink1-in-0: requester1
          sink2-in-0: requester2          
        definition: sink1;sink2
  application:
    name: rss

When I start the application I get the below error.

Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'rss-exchange.requester.errors.recoverer' defined in null: Cannot register bean definition [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'rss-exchange.requester.errors.recoverer': There is already [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:995) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:330) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.beans.factory.support.BeanDefinitionReaderUtils.registerBeanDefinition(BeanDefinitionReaderUtils.java:164) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.doRegisterBean(AnnotatedBeanDefinitionReader.java:285) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.registerBean(AnnotatedBeanDefinitionReader.java:233) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotationConfigApplicationContext.registerBean(AnnotationConfigApplicationContext.java:198) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:687) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:525) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:136) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
... 24 common frames omitted

From logs it is clear that it is trying to create 'rss-exchange.requester.errors.recoverer' again. Only sink1 starts in this case with the below messages.

Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}

When I add "allow-bean-definition-overriding: true" then everything works fine as expected as shown below logs.

Fri Aug 27 15:03:57 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}

I am not sure is the right way to do it since I am getting errors for bean already exists even though the use case that I am trying is working with overriding property.

NOTE - It's been only a couple of days since I started exploring stream cloud stream so consider me as naive if I have asked something silly.

1
Don't quite understand what are you trying to accomplish here. Why are you using a remote system when producer and consumer are part of the same application? What is the point?Oleg Zhurakousky
Hi @OlegZhurakousky, I understand your confusion. Since this program that I took was inspired by github.com/spring-cloud/spring-cloud-stream-samples/blob/main/… I have taken the liberty to do everything on the same machine. But I have updated the diagram in question which reflects a clear picture of what I want to achieve.MAY
So there is an issue in stream. There is a workaround, but it is complicated so we would need to address it. Thankfully there we already have a PR for that - github.com/spring-cloud/spring-cloud-stream/pull/1388 . I also created an issue github.com/spring-cloud/spring-cloud-stream/issues/2218.Oleg Zhurakousky
Thanks, @OlegZhurakousky for looking into this. Is it safe to use this approach?. Since the use case I mentioned will be the core feature for communication in the code that I am trying to build. Do you suggest some other alternative or should I wait for the bug to be resolved?.MAY

1 Answers

1
votes

So the issue was fixed and tested with your configuration, merged, and is available in the current snapshot (3.2.0-SNAPSHOT).