4
votes

Note to duplicate markers: I DID check out the other question, but it does not answer my specific question below.

So imagine I have a Kafka topic on a single server with only one partition. So it is much similar to a queue.

Now lets assume I want 100 listeners waiting to accept values from the queue. So by design, if all 100 consumers are in a single group, the contents from the log (or queue here) will be distributed among the consumers. So the operation will be over in 1/100th of the time.

The problem is that the Spring Kafka listener is only configured with the topic name.

@Service
public class Consumer {

    @KafkaListener(topics = "${app.topic}")
    public void receive(@Payload String message,
                        @Headers MessageHeaders headers) {
        System.out.println("Received message="+message);
        headers.keySet().forEach(key -> System.out.println(key+"->"+headers.get(key)));
    }
}

I can seem to get Kafka to spawn up a 100 consumers for processing messages from the "queue" (logs). How can it be done?

1
See the answer. You question is not about Apache Kafka. There is no queue-like behavior with Kafka at all. You can't distribute evenly messages from the same partition in Kafka. There is an offset entity to deal with. And this one must be processed in the proper order. So, if you want queue, don't go to Kafka!Artem Bilan
Stop mixing up "queue" and "topic" they are in no way similar. Kafka uses "topic"DwB

1 Answers

4
votes

Check out this answer for an understanding of Kafka consumers In Apache Kafka why can't there be more consumer instances than partitions?

To properly distribute messages amongst a single consumer group you must have more than one partition. Once you find the correct partition amount for your load I would use spring cloud streaming to better manage your concurrency and consumer group assignment.

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

Sample of sink

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

public static void main(String[] args) {
    SpringApplication.run(LoggingConsumerApplication.class, args);
}

@StreamListener(Sink.INPUT)
public void handle(Person person) {
    System.out.println("Received: " + person);
}

public static class Person {
    private String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String toString() {
        return this.name;
    }
}
}

Concurrency settings

cloud:
  stream:
    bindings:
      input:
        destination: <topic-name>
        group: <consumer-group>
        consumer:
          headerMode: raw
          partitioned: true
          concurrency: 20

More information available here https://cloud.spring.io/spring-cloud-stream/