0
votes

Using spring cloud stream with RabbitMQ version. I want to achieve consumer auto-scaling as explained here. with the below configuration, I have 2 concurrent consumers. But SimpleMessageListenerContainer couldn't add consumers dynamically. I had checked SMLC instance is initialized with both concurrency and max-concurrency properties. Is there anything, I am missing to get dynamic scaling?

Note: I had published 200 messages in input1 exchange in rabbitmq.

pom.xml file:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>consumerex</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>graphql</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>3.0.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.yml file:

spring:
  cloud:
    stream:
      bindings:
        input1:
          destination: input1
          binder: local_rabbit
          group: logMessageConsumers
          consumer:
            concurrency: 2
        input2:
          destination: input2
          binder: local_rabbit
          group: logMessageConsumers
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
      rabbit:
        bindings:
          input1:
            consumer:
              maxConcurrency: 10
server:
  port: 0
management:
  health:
    binders:
       enabled: true

App.java file:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class App 
{
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    @StreamListener("input1")
    public void enrichLogMessage(String log) {
        //code to keep current consumer busy. So auto scalling can happen.
        long time = System.currentTimeMillis() + 120000;
        while(time> System.currentTimeMillis()) {

        }
        System.out.println(Thread.currentThread().getName() + "    input1       "  + log);

    }

    @StreamListener("input2")
    public void input2(String log) {
        System.out.println(Thread.currentThread().getName() + "     input2         "  + log);
    }
}

MyProcessor.java file:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MyProcessor {
    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();

}
1
The RabbitMessageChannelBinder does not support dynamic scaling. There must be at least one consumer per partition. The consumer’s instanceIndex is used to indicate which partition is consumed. Platforms such as Cloud Foundry can have only one instance with an instanceIndex. cloud.spring.io/spring-cloud-static/…Deepak Agrawal
Dynamic scaling within an instance is, indeed, supported; see my answer.Gary Russell

1 Answers

1
votes

The algorithm to increase the concurrency runs on the listener thread(s) so sleeping the thread like that will prevent it from working.

The following application shows it working as designed...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So59723356Application {


    private static final Logger log = LoggerFactory.getLogger(So59723356Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So59723356Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) throws InterruptedException {
        log.info(in);
        Thread.sleep(1_000);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> IntStream.range(0, 50).forEach(i -> template.convertAndSend("input", "input", "foo"));
    }

}
spring.cloud.stream.bindings.input.group=group
spring.cloud.stream.bindings.input.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input.consumer.max-concurrency=5

and

2020-01-14 14:20:51.849  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:51.852  INFO 71536 --- [           main] com.example.demo.So59723356Application   : Started So59723356Application in 1.729 seconds (JVM running for 2.123)
2020-01-14 14:20:52.855  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:53.862  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:54.869  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:55.874  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:56.882  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:57.885  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:58.889  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:59.894  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:00.901  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:01.906  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:02.911  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:03.917  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:03.917  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:04.922  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:04.922  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:05.924  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:05.924  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:06.930  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:06.930  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo

See the new thread start at 14:21:03.917.