0
votes

Hi I am new to kafka I am using kafka version 0.10.2 and zookeeper version 3.4.9 . I have a topic having two partition and two consumer running . So inorder to increasing processing speed I decided to increase the number of partition to 10 so that I can increase the number of consumer . So I ran the command

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic --partition 10.

So I observed two strange things

  1. My consumers were still attached to two partitions only . Rest of the partiton were not having any consumers.(Expected Behaviour the two consumers should listen to all the 10 partitions)

  2. Messages are ony being pushed to two (old partition) .new partitions are not recieving any messages.(Expected Behaviour messages should be distributed in RoundRobin manner in all partitions.)

I am using this command to see the details about partitions

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group topic-group

My Consumer Code:

class KafkaPollingConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
        private static final String TAG = "[KafkaPollingConsumer]"
        private final KafkaConsumer<String, byte []> kafkaConsumer
        private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
        List topicNameList
        Map kafkaTopicConfigMap = new HashMap<String,Object>()
        Map kafkaTopicMessageListMap = new HashMap<String,List>()

        public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex){
            logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
            logger.debug("Populating Property for kafak consumer")
            Properties kafkaConsumerProperties = new Properties()
            kafkaConsumerProperties.put("group.id", groupName)
            kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer")
            switch(serverType){
                case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
                    kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
                    kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
                    kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
                    break
                case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
                    kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
                    kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
                    kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
                    kafkaConsumerProperties.put("max.poll.records",10)
                    kafkaConsumerProperties.put("max.poll.interval.ms",900000)
                    kafkaConsumerProperties.put("request.timeout.ms",900000)
                    break
                default :
                    throw "Invalid server type"
                    break
            }
            logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
            kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
            topicNameList = topicNameRegex.split(Pattern.quote('|'))
            logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
            logger.debug("{} [Constructor] Exit",TAG)
        }

        private class HandleRebalance implements ConsumerRebalanceListener {
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                if(currentOffsetsMap != null && !currentOffsetsMap.isEmpty()) {
                    logger.debug("{} In onPartitionsRevoked Rebalanced ",TAG)
                    kafkaConsumer.commitSync(currentOffsetsMap)
                }
            }
        }

        @Override
        void run() {
            logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
            populateKafkaConfigMap()
            initializeKafkaTopicMessageListMap()
            String topicName
            String consumerClassName
            String consumerMethodName
            Boolean isBatchJob
            Integer batchSize = 0
            final Thread mainThread = Thread.currentThread()
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
                    kafkaConsumer.wakeup()
                    try {
                        mainThread.join()
                    } catch (InterruptedException exception) {
                        logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
                    }
                }
            })
            kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
            try{
                while(true){
                    logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
                    ConsumerRecords kafkaRecords = kafkaConsumer.poll(100)
                    for(ConsumerRecord record: kafkaRecords){
                        topicName = record.topic()
                        DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
                        consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                        consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                        isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                        logger.debug("Details about Message")
                        logger.debug("Thread {}",mainThread.getName())
                        logger.debug("Topic {}",topicName)
                        logger.debug("Partition {}",record.partition().toString())
                        logger.debug("Offset {}",record.offset().toString())
                        logger.debug("clasName {}",consumerClassName)
                        logger.debug("methodName {}",consumerMethodName)
                        logger.debug("isBatchJob {}",isBatchJob.toString())
                        if(isBatchJob == true){
                            batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
                            logger.debug("batchSize {}",batchSize.toString())
                        }
                        Object message = record.value()
                        logger.debug("message {}",message.toString())
                        publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
                        Thread.sleep(60000)
                        currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
                    }
                    logger.debug("{} Commiting Messages to Kafka",TAG)
                    kafkaConsumer.commitSync(currentOffsetsMap)
                }
            }
            catch(InterruptException exception){
                logger.error("{} In InterruptException",TAG)
                logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
            }
            catch (WakeupException exception) {
                logger.error("{} In WakeUp Exception",TAG)
                logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
            }
            catch(Exception exception){
                logger.error("{} In Exception",TAG)
                logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
            }
            finally {
                logger.error("{} In finally commiting remaining offset ",TAG)
                publishAllKafkaTopicBatchMessages()
                kafkaConsumer.commitSync(currentOffsetsMap)
                kafkaConsumer.close()
                logger.error("{} Exiting Consumer",TAG)
            }
        }


private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
    logger.debug("{} [publishMessageToConsumer] Enter",TAG)
    if(isBatchJob == true){
        publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
    }
    else{
        publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
    }
    logger.debug("{} [publishMessageToConsumer] Exit",TAG)
}

private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
    logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
    executeConsumerMethod(consumerClassName,consumerMethodName,message)
    logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
}

private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
    logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    if(consumerMessageList.size() == batchSize){
        logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
        executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
        consumerMessageList.clear()
    }
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)
    logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
}

private void populateKafkaConfigMap(){
    logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
    topicNameList.each { topicName ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
        kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
    logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
}

private void initializeKafkaTopicMessageListMap(){
    logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
    topicNameList.each { topicName ->
        kafkaTopicMessageListMap.put(topicName,[])
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
    logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
}

private void executeConsumerMethod(String className, String methodName, def messages){
    try{
        logger.debug("{} [executeConsumerMethod] Enter",TAG)
        logger.debug("{} [executeConsumerMethod] className  {} methodName {} messages {}",TAG,className,methodName,messages.toString())
        Class.forName(className)."$methodName"(messages)
    } catch (Exception exception){
        logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
                className, messages.toString(), exception.getStackTrace().join("\n"))
    }
    logger.debug("{} [executeConsumerMethod] Exit",TAG)
}

private void publishAllKafkaTopicBatchMessages(){
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
    String consumerClassName = null
    String consumerMethodName = null
    kafkaTopicMessageListMap.each { topicName,messageList ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
        consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
        consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
        logger.debug("{} Pushing message in topic {} className {} methodName {} ",TAG,topicName,consumerClassName,consumerMethodName)
        if(messageList != null && messageList.size() > 0){
            executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
            messageList.clear()
            kafkaTopicMessageListMap.put(topicName,messageList)
        }
    }
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
}

Consumer Properties are :

auto.commit.interval.ms = 5000 
auto.offset.reset = earliest 
bootstrap.servers = [localhost:9092] 
check.crcs = true 
client.id = consumer-1 
connections.max.idle.ms = 540000 
enable.auto.commit = false 
exclude.internal.topics = true 
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
group.id = t1 
heartbeat.interval.ms = 3000 
interceptor.classes = null 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 36000000 
max.poll.records = 10 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] 
receive.buffer.bytes = 65536 
reconnect.backoff.ms = 50 
request.timeout.ms = 36000000 
retry.backoff.ms = 100 
sasl.jaas.config = null 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI 
security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 
session.timeout.ms = 10000 
ssl.cipher.suites = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null 
ssl.key.password = null 
ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null 
ssl.keystore.password = null 
ssl.keystore.type = JKS 
ssl.protocol = TLS 
ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX 
ssl.truststore.location = null 
ssl.truststore.password = null 
ssl.truststore.type = JKS 
value.deserializer = class com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer

Producer Code :

 Properties kafkaProducerProperties = getKafkaProducerProperties(topicName)
        if(kafkaProducerProperties != null){
            priorityKafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(kafkaProducerProperties)
                ProducerRecord<String,byte []> record = new ProducerRecord<String,byte []>(topicName, messageMap)
            try {
                priorityKafkaProducer.send(record).get()
                priorityKafkaProducer.close()
            } catch (Exception e) {
                e.printStackTrace()
            }

        }
        else{
            throw "Invalid Producer Properties for " + topicName
        }

Producer Config :

acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class com.abhimanyu.kafkaproducer.serializer.CustomObjectSerializer

Are the problem I am facing expected behaviour or I am missing some thing?

1

1 Answers

0
votes

Did you wait for 5 minutes (or whatever the meta data refresh time interval is configured for)?