0
votes

I am using spring with Kafka to consume the data from Kafka topics. I have configured the concurrency as 10. So different threads poll the broker for the messages and processing the messages. Even after some time (processed successfully), we are receiving the same message back to a different thread to the consumer. we are able to process the received messages within configured max.poll.interval.ms=1500000.

Please find the below-configured properties of Kafka consumer. I have configured the auto-commit through Kafka.

    group.id=ips-request-group //group id
    enable.auto.commit=true // auto commit
    auto.commit.interval.ms=60000 // auto commit inverval
    session.timeout.ms=100000 // session time out
    request.timeout.ms=610000 // request time out
    fetch.max.wait.ms=500  //polling time out
    heartbeat.interval.ms=20000 // heart beat interval
    auto.offset.reset=latest  //consuming latest messages.
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer // key
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer //value
    max.poll.records=10 // max polling records
    max.poll.interval.ms=1500000 // max polling interval ms /*

Could you please help me to resolve duplicate receiving messages to Kafka consumer.

1
You need to provide much more information; how do you know you got duplicates? Did a rebalance occur for some other reason (e.g. another member joined). I recommend never using auto.commit - it's more reliable for the listener container to do the commits in a more predicable fashion (after every RECORD or BATCH).Gary Russell
Hi Gary, When any message came we are validating. is the particular transaction existed or not DB. If the transaction has already existed we are inserting to tracker table with some description. Can't we commit the messages offset through auto interval?Narayana Sadhu
Perhaps the problem is on the publishing side. Your consumer should store the partition and offset as well in the DB; I don't see how you can possibly get a duplicate if no rebalance occurred.Gary Russell
Even we are processing the messages with in the time also we are receiving the below message in the logs. WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto-commit of offsets ced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.Narayana Sadhu
Which means a rebalance did occur; you need to add de-duplication logic in your consumer to deal with such situations. You need to reduce max.poll.records or increase max.poll.interval.ms.Gary Russell

1 Answers

0
votes

To commit the offset after each record is proceessed; set auto.commit.enabled=false and set the container property ackMode to AckMode.RECORD.