0
votes

I want to stop the consumers in rabbitmq of creating a queue from spring cloud stream bindings while hitting endpoint /prepare-for-shutdown. Please find below the configuration,

Added dependency in pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Application.yml:

spring:
  cloud: 
    stream:
      bindings: 
        produceChannel: 
          binder: rabbit
          content-type: application/json
          destination: internal-exchange
        consumeChannel: 
          binder: rabbit
          content-type: application/json
          destination: internal-exchange
          group: small-queue
      rabbit:
        bindings:
          consumeChannel:
            consumer:
              autoBindDlq: true
              durableSubscription: true
              requeueRejected: false
              republishToDlq: true
              bindingRoutingKey: admin
          produceChannel: 
            producer:               
              routingKeyExpression: '"admin"'

sample.java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sample{
    @Input("consumeChannel")
    SubscribableChannel consumeChannel();

    @Output("produceChannel")
    SubscribableChannel produceChannel();
}

The integration with RabbitMQ has been achieved using Spring Cloud's @StreamLinster and @EnableBinding abstractions as shown below:

@EnableBinding(Sample.class)


@StreamListener("consumeChannel")
public void sampleMessage(String message) {
    // code
}

Looking forward to stop a consumer of a RabbitMQ queue programmatically. Thanks in Advance

1
You can do it programmatically via the actuator. See this answer.Gary Russell
But am getting null values when calling gatherInputBindings() in BindingsEndpoint class. Did I miss any code/ configuration?Vanitha V
when I access /actuator/bindings ,it returns getting empty values. Please suggest me @GaryRussellVanitha V
I can't explain that; it works fine for me (and everyone else); you must have some incorrect configuration; if you can provide a small, complete, verifiable example that exhibits this behavior, I can take a look.Gary Russell

1 Answers

0
votes

I analyzed the issue why am getting empty values by invoking the actuator endpoint '/actuator/bindings'

When hitting actuator binding endpoint, it invokes the method gatherInputBindings()in BindingsEndpoint.class.

In BindingsEndpoint.java, fetching the binding values from inputBindingLifecycle

(Collection<Binding<?>>) new DirectFieldAccessor(inputBindingLifecycle).getPropertyValue("inputBindings");

In below methods, setting empty bindings list to inputBindings

In InputBindingLifecycle.java,

void doStartWithBindable(Bindable bindable) {
    this.inputBindings = bindable.createAndBindInputs(bindingService);
}

In Bindable.java,

default Collection<Binding<Object>> createAndBindInputs(BindingService adapter) {
        return Collections.<Binding<Object>>emptyList();
    }

Pls suggest me to fix these issues whether need to change any dependency or any code configuration