3
votes

I I am using Kafka 0.10.0 and zookeeper 3.4.6 in my production server .I am having 20 topics each with approx 50 partitions. I am having the total of 100 consumers each subscribed to different topics and partitions .All the consumers are having the same groupId. So will this be the case that if a consumer is added or removed for a specific topic then the consumers attached to the different topic will also undergo rebalancing?

My consumer code is:

public static void main(String[] args) {
        String groupId = "prod"
        String topicRegex = args[0]
        String consumerTimeOut = "10000"
        int n_threads = 1
        if (args && args.size() > 1) {
            ConfigLoader.init(args[1])
        }
        else {
            ConfigLoader.init('development')
        }
        if(args && args.size() > 2 && args[2].isInteger()){
            n_threads = (args[2]).toInteger()
        }

        ExecutorService executor = Executors.newFixedThreadPool(n_threads)
        addShutdownHook(executor)
        String zooKeeper = ConfigLoader.conf.zookeeper.hostName
        List<Runnable> taskList = []
        for(int i = 0; i < n_threads; i++){
            KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topicRegex, consumerTimeOut)
            taskList.add(example)
        }
        taskList.each{ task ->
            executor.submit(task)
        }
        executor.shutdown()
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
    }

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId, String consumerTimeOut) {

        Properties props = new Properties()
        props.put("zookeeper.connect", a_zookeeper)
        props.put("group.id", a_groupId)
        props.put("zookeeper.session.timeout.ms", "10000")
        props.put("rebalance.backoff.ms","10000")
        props.put("zookeeper.sync.time.ms","200")
        props.put("rebalance.max.retries","10")
        props.put("enable.auto.commit", "false")
        props.put("consumer.timeout.ms", consumerTimeOut)
        props.put("auto.offset.reset", "smallest")
        return new ConsumerConfig(props)

    }

public void run(String topicRegex) {
        String threadName = Thread.currentThread().getName()
        logger.info("{} [{}] main Starting", TAG, threadName)
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>()
        List<KafkaStream<byte[], byte[]>> streams = consumer.createMessageStreamsByFilter(new Whitelist(topicRegex),1)
        ConsumerConnector consumerConnector = consumer

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator()
            List<Object> batchTypeObjList = []
            String topic
            String topicObjectType
            String method
            String className
            String deserialzer
            Integer batchSize = 200
            while (true){
                boolean hasNext = false
                try {
                    hasNext = consumerIterator.hasNext()
                } catch (InterruptedException interruptedException) {
                    //if (exception instanceof InterruptedException) {
                    logger.error("{} [{}]Interrupted Exception: {}", TAG, threadName, interruptedException.getMessage())
                    throw interruptedException
                    //} else {
                } catch(ConsumerTimeoutException timeoutException){
                    logger.error("{} [{}] Timeout Exception: {}", TAG, threadName, timeoutException.getMessage())
                    topicListMap.each{ eachTopic, value ->
                        batchTypeObjList = topicListMap.get(eachTopic)
                        if(batchTypeObjList != null && !batchTypeObjList.isEmpty()) {
                            def dbObject = topicConfigMap.get(eachTopic)
                            logger.debug("{} [{}] Timeout Happened.. Indexing remaining objects in list for topic: {}", TAG, threadName, eachTopic)
                            className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                            method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                            int sleepTime = 0
                            if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
                                sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
                            executeMethod(className, method, batchTypeObjList)
                            batchTypeObjList.clear()
                            topicListMap.put(eachTopic,batchTypeObjList)
                            sleep(sleepTime)
                        }
                    }
                    consumer.commitOffsets()
                    continue
                } catch(Exception exception){
                    logger.error("{} [{}]Exception: {}", TAG, threadName, exception.getMessage())
                    throw exception
                }
                if(hasNext) {
                    def consumerObj = consumerIterator.next()
                    logger.debug("{} [{}] partition name: {}", TAG, threadName, consumerObj.partition())
                    topic = consumerObj.topic()
                    DBObject dbObject = topicConfigMap.get(topic)
                    logger.debug("{} [{}] topic name: {}", TAG, threadName, topic)
                    topicObjectType = dbObject.get(KafkaTopicConfigEntity.TOPIC_OBJECT_TYPE_KEY)
                    deserialzer = KafkaConfig.DEFAULT_DESERIALIZER
                    if(KafkaConfig.DESERIALIZER_MAP.containsKey(topicObjectType)){
                        deserialzer = KafkaConfig.DESERIALIZER_MAP.get(topicObjectType)
                    }
                    className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                    method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                    boolean isBatchJob = dbObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                    if(dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) != null)
                        batchSize = dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY)
                    else
                        batchSize = 1
                    Object queueObj = (Class.forName(deserialzer)).deserialize(consumerObj.message())
                    int sleepTime = 0
                    if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
                        sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
                    if(isBatchJob == true){
                        batchTypeObjList = topicListMap.get(topic)
                        batchTypeObjList.add(queueObj)
                        if(batchTypeObjList.size() == batchSize) {
                            executeMethod(className, method, batchTypeObjList)
                            batchTypeObjList.clear()
                            sleep(sleepTime)
                        }
                        topicListMap.put(topic,batchTypeObjList)
                    } else {
                        executeMethod(className, method, queueObj)
                        sleep(sleepTime)
                    }
                    consumer.commitOffsets()
                }
            }
            logger.debug("{} [{}] Shutting Down Process ", TAG, threadName)
        }
    }

Any help will be appriciated.

1
Yes. If all consumer instances share a same group id, the consumer count change will trigger a rebalance for the group.amethystic

1 Answers

5
votes

Whenever a consumer leaves or joins a consumer group, the entire group undergoes a rebalance. Since the group tracks all partitions across all topics that its members are subscribed to you are right in thinking, that this can lead to rebalancing of consumers that are not subscribed to the topic in question.

Please see below for a small test illustrating this point, I have a broker with two topics test1 (2 partitions) and test2 (9 partitions) and am starting two consumers, both with the same consumer group, each subscribes to only one of the two topics. As you can see, when consumer2 joins the group, consumer1 gets all partitions revoked and reassigned, because the entire group rebalances.

Subscribing consumer1 to topic test1
Starting thread for consumer1
Polling consumer1
consumer1 got 0 partitions revoked!
consumer1 got 2 partitions assigned!
Polling consumer1
Polling consumer1
Polling consumer1
Subscribing consumer2 to topic test2
Starting thread for consumer2
Polling consumer2
Polling consumer1
consumer2 got 0 partitions revoked!
Polling consumer1
Polling consumer1
consumer1 got 2 partitions revoked!
consumer2 got 9 partitions assigned!
consumer1 got 2 partitions assigned!
Polling consumer2
Polling consumer1
Polling consumer2
Polling consumer1
Polling consumer2