The Kafka consumer in our Java application was re-consuming messages from the topic for some reason.
Producer send messages to a topic which has four partitions.
We have one consumer consumes messages from this topic. The application runs all the time in weekdays exception weekend:It won't call poll method during the weekend.
Consumer config: auto commit, auto commit time is 5s (default).
The application ran fine until a Sunday when it resumed to call poll method again. We saw million of messages were polled from the topic. The consumer was basically polling all the messages from the topic. When comparing the new offset with the offset before it stopped for the weekend. The new offset is much smaller, it was like reset to very low numbers for all of the four partitions.
We don't know what happened in the consumer side as it is not calling poll method, so no log message was printed. We checked kafka server log, but we found nothing.
Does anyone see this before? Would the consumer be misconfigured?
<spring.kafka.version>1.1.2.RELEASE</spring.kafka.version>
...
<bean id="defaultKafkaConsumer"
class="org.apache.kafka.clients.consumer.KafkaConsumer">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"></entry>
<entry key="max.block.ms" value="5000"></entry>
<entry key="group.id" value="kafkaconnect.tca"></entry>
<entry key="auto.offset.reset" value="earliest"></entry>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
</map>
</constructor-arg>
</bean>
getKafkaConsumer().subscribe(Arrays.asList(getKafkaTopic()));
// set up the polling task
handler = timer.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
processPoll();
} catch (Throwable t) {
LOG.error(String.format("error processing poll for inet: %s, details: %s - %s", getId(), t.getMessage(), t.getCause()), t);
}
}
}, 3, 3, TimeUnit.MILLISECONDS);
processPoll() Method: destination will not be ready during the weekend.
try {
if (!isDestinationReady()) {
if (destinationIgnoreCnt++ ==0) {
LOG.warn(String.format("outbound destination session is not ready - trying: %s/%s",destinationIgnoreCnt,destinationwaitingloop));
} else if ((destinationIgnoreCnt++ % destinationwaitingloop) == 0) {
LOG.warn(String.format("outbound destination session is not ready - trying %s/%s", destinationIgnoreCnt,destinationwaitingloop));
destinationIgnoreCnt = 1;
}
messageIgnoreCnt = 0;
return;
}
if(!isDestinationOpen()) {
if (destinationIgnoreCnt++ ==0) {
LOG.error(String.format("outbound destination is not opended - trying:%s/%s.", destinationIgnoreCnt,destinationwaitingloop) );
} else if ((destinationIgnoreCnt++ % destinationwaitingloop) == 0) {
LOG.error(String.format("outbound destination is not opended - trying %s/%s.", destinationIgnoreCnt,destinationwaitingloop));
destinationIgnoreCnt = 1;
}
messageIgnoreCnt = 0;
return;
}
if (messageIgnoreCnt++ == 0) {
LOG.info(String.format("kafka poller started. poll interval %s wait: %s", pollingInterval, 60000));
} else if ((messageIgnoreCnt++ % 30) == 0) {// approximately 30mins
LOG.info(String.format("kafka poller started. poll interval %s wait %s", pollingInterval, 60000));
messageIgnoreCnt = 1;
}
if (getKafkaConsumer() == null) {
LOG.critical("KafkaListener consumer is null");
return;
}
ConsumerRecords<String, String> records = getKafkaConsumer().poll(60000);
if (records == null || records.isEmpty()) {
LOG.debug("zero records received from Kafka");
return;
}
for (ConsumerRecord<String, String> record : records) {
LOG.info(String.format("consuming from topic = %s ", record.toString()));
try {
String jsonMsg = record.value();
DirectBatchRequest payload = JacksonUtilities.getObjectMapper().readValue(jsonMsg, DirectBatchRequest.class);
if (payload != null) {
LOG.info(String.format("Got it reportId:%s", payload.getDestinationId()));
if(payload.getDestinationId() == 0) {
LOG.info(String.format("Applying default destination desk:%s", defaultDeskId));
payload.setDestinationId(defaultDeskId);
}
List<RequestEntryType> requestEntryTypeList = ((StreamDirectRequest) payload).getRequestList();
LOG.info(String.format("Processing size: %s" , requestEntryTypeList.size()) );
processRequest((StreamDirectRequest) payload); //async call
LOG.info(String.format("Processing size: %s sent to Steam" , requestEntryTypeList.size()) );
}
} catch (Throwable t) {
LOG.error(String.format("KafkaListener JSON%s conversion error %s", record, t.getMessage()));
}
}
} catch (Throwable t) {
LOG.error(String.format("KafkaListener exception %s", t.getMessage()));
}
getKafkaConsumer
method? – kellanburketpublic KafkaConsumer<String, String> getKafkaConsumer() { return kafkaConsumer; }
– iampolo