0
votes

I used Spring Stream Cloud to consume messages on Kafka. When message produced on kafka all consumers hit.

but documentation of kafka says that by use of group only one consumer consume message.

this is my Consumer code.

@EnableBinding(Sink.class)

public class Consumer2 {


    @StreamListener(target = Sink.INPUT)
    public void consume(String message) {
        System.out.println("33333");

    }

    @StreamListener(target = Sink.INPUT)
    public void consume1(String message) {
        System.out.println("444444");

    }


}
}

and this is my config but both of my methods called :(

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
            - localhost:9092
      bindings:
        input:
          binder: kafka
          destination: abbas
          content-type: text/plain
          group: input-group-1

        output:
          binder: kafka
          destination: abbas
          group: output-group-1
          content-type: text/plain
2

2 Answers

0
votes

With that configuration, you only have 1 consumer (SINK.INPUT), not 2 consumers (@StreamListener is not a consumer, its a model for handling inbound messages)

That's why spring routes the inbound messages to your two @StreamListener annotates with the same sink.

0
votes

in one instance group has no impact.

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings..group property to specify a group name:

https://www.baeldung.com/spring-cloud-stream section 5.4