1
votes

My configuration for the consumer is as documented in Spring cloud stream consumer properties documentation.

spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'

I have 4 partitions for kstream_test topic and they are filled with messages from producer as seen below:

root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278

My spring cloud stream kafka binder based configuration is:

spring.cloud.stream.bindings.input:
  destination: kstream_test
  group: consumer-group-G1_test
  consumer:
    useNativeDecoding: true
    headerMode: raw
    startOffset: latest
    partitioned: true
    concurrency: 3

KStream Listener class

    @StreamListener
    @SendTo(MessagingStreams.OUTPUT)
    public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
        ......
        log.info("Got a message");
        ......
        return kstreams;
    }

My producer sends 100 messages in 1 run. But the logs seems to have only 1 thread StreamThread-1 handling the messages, though I have concurrency as 3. What might be wrong here ? Is 100 messages not enough to see the concurrency at play ?

2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.945  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.956  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.972  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message

UPDATE:

As per the answer, the below num.stream.threads configuration works at the binder level.

spring.cloud.stream.kafka.streams.binder.configuration:
 num.stream.threads: 3
1

1 Answers

2
votes

It seems that the num.stream.threads needs to be set to increase the concurrency...

/** {@code num.stream.threads} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";

...it defaults to 1.

The binder should really set that based on the ...consumer.concurrency property; please open a github issue to that effect against the binder.

In the meantime, you can just set that property directly in ...consumer.configuration.

CORRECTION

I've just been told that the ...consumer.configuration is not currently applied to the streams binder either; you would have to set it at the binder level.