0
votes

We are creating a rabbitMq consumer using the new spring cloud function library.

However we find that on startup of the application, we don't see the queues or exchanges create on the rabbitMq instance.

Here is our config.

spring:
  cloud:
    function:
      definition: someReceiver
    stream:
      binders:
        rabbit:
          type: rabbit
      bindings:
        someReceiver-in-0:
          consumer:
            max-attemps: 1
            batch-mode: true
          binder: rabbit
          destination: someExhange
          group: someQueue
      default-binder: rabbit
      rabbit:
        bindings:
          someReceiver-in-0:
            consumer:
              acknowledge-mode: MANUAL
              auto-bind-dlq: true
              queue-name-group-only: true
              exchange-type: topic
              max-concurrency: 10
              prefetch: 200
              enable-batching: true
              batch-size: 10
              receive-timeout: 200
              dlq-dead-letter-exchange:

This is our consumer.

 @Bean
    public Consumer<Message<Long>> someReceiver() {
        return ....
    }

In logs we can see :

o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler _org.springframework.integration.errorLogger
o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel    : Channel 'X' has 1 subscriber(s).

The problem we are having is that no queue or exchange is being created on the rabbitMq broker on application startup. We were expecting that a queue named someQueue and an exchanged named someExchange should have been created on application startup

1

1 Answers

0
votes

Works as designed just directly from https://start.spring.io:

2021-04-19 12:02:07.476  INFO 28260 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.someReceiver-in-0' has 1 subscriber(s).
2021-04-19 12:02:07.563  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
2021-04-19 12:02:07.595  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
2021-04-19 12:02:07.600  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel someReceiver-in-0
2021-04-19 12:02:07.613  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler _org.springframework.integration.errorLogger
2021-04-19 12:02:07.631  INFO 28260 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-19 12:02:07.631  INFO 28260 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2021-04-19 12:02:07.631  INFO 28260 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-04-19 12:02:07.632  INFO 28260 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: rabbit
2021-04-19 12:02:07.740  INFO 28260 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: rabbit
2021-04-19 12:02:07.740  INFO 28260 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: rabbit
2021-04-19 12:02:07.792  INFO 28260 --- [           main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: someQueue, bound to: someExhange
2021-04-19 12:02:07.793  INFO 28260 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-04-19 12:02:07.938  INFO 28260 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#244e619a:0/SimpleConnection@73a0f2b [delegate=amqp://[email protected]:5672/, localPort= 51143]
2021-04-19 12:02:07.991  INFO 28260 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'someQueue.errors' has 1 subscriber(s).
2021-04-19 12:02:07.991  INFO 28260 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'someQueue.errors' has 2 subscriber(s).
2021-04-19 12:02:08.002  INFO 28260 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started bean 'inbound.someQueue'
2021-04-19 12:02:08.010  INFO 28260 --- [           main] o.s.s.c.s.s.So67160902Application        : Started So67160902Application in 1.659 seconds (JVM running for 2.147)

And I see this in the Rabbit MQ Management Console:

enter image description here

You probably miss some dependency or do something else in your configuration to prevent RabbitMQ Binder to do its stuff.

This is my deps, which are in the pom just generated from https://start.spring.io:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <scope>test</scope>
        <classifier>test-binder</classifier>
        <type>test-jar</type>
    </dependency>
</dependencies>