0
votes

I am creating consumers (a consumer group with single consumer in it) :

    Properties properties = new Properties();
    properties.put("zookeeper.connect","localhost:2181");
    properties.put("auto.offset.reset", "largest");
    properties.put("group.id", groupId);
    properties.put("auto.commit.enable", "true");
    ConsumerConfig consumerConfig = new ConsumerConfig(properties);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    consumerMap.entrySet().stream().forEach(
                streams -> {
                    streams.getValue().stream().forEach(
                                stream -> {
                                    KafkaBasicConsumer customConsumer = new KafkaBasicConsumer();                               
                                    try {
                                        Future<?> consumerFuture = kafkaConsumerExecutor.submit(customConsumer);

                                        kafkaConsumersFuture.put(groupId, consumerFuture);                                                                  
                                    } catch (Exception e) {
                                        logger.error("---- Got error : "+ e.getMessage());
                                        logger.error("Exception : ", e);
                                    }
                                }
                            );
                }
            );

I have subscribed 2 consumers for the same topic. I am unsubscribing the consumer by storing its future object and then invoking consumerFuture.cancel(Boolean.TRUE);

Now I subscribe the same consumer again with above code and it gets successfully registered. However, when the publisher now publishes the newly subscribed consumer is not getting messages whereas the other consumer which was registered is getting messages

I am also checking offsets of consumers, they are getting updated when producer publishes but consumers are not getting messages. Before producing :

Group Topic Pid Offset logSize Lag

A T1 0 94 94 1

Group Topic Pid Offset logSize Lag

B T1 0 94 94 1

After producing :

Group Topic Pid Offset logSize Lag

A T1 0 95 97 2

Group Topic Pid Offset logSize Lag

B T1 0 94 97 2

I am not able to figure out that if this an issue from producer side (partitions not enough) or if I have created consumer in an incorrect way Also, I am not able to figure out what is log and lag column means in this.

Let me know if anyone can help or need more details.

1
how many partitions do you have per topic? - Nautilus
I have 3 partitions per topic - nikhil7610
are you still fighting with the same problem? its been almost 2 months - Nautilus

1 Answers

0
votes

I found to solution to my problem, thanks @nautilus for reminding to update.

My main intent was to provide endpoint to subscribe and unsubscribe a consumer in kafka. Since kafka provides only subscribing and not unsubscribing (only manually possible) I had to write layer over kafka implementation.

I stored the consumer object in a static map with key as group id (since my consumer group can have only one consumer)

Problem was I was not closing consumer once created when unsubscribing and old consumer with same group id was preventing new from getting messages

private static Map kafkaConsumersFuture

Based on some parameter, find out group id

    kafkaConsumersFuture.put(groupId, consumerConnector);

And while unsubcribing I did

    ConsumerConnector consumerConnector = kafkaConsumersFuture.get(groupId);
    if(consumerConnector!=null) {

        consumerConnector.shutdown();
        kafkaConsumersFuture.remove(groupId);
    }