0
votes

The Kafka consumer in our Java application was re-consuming messages from the topic for some reason.

Producer send messages to a topic which has four partitions. We have one consumer consumes messages from this topic. The application runs all the time in weekdays exception weekend:It won't call poll method during the weekend.
Consumer config: auto commit, auto commit time is 5s (default).

The application ran fine until a Sunday when it resumed to call poll method again. We saw million of messages were polled from the topic. The consumer was basically polling all the messages from the topic. When comparing the new offset with the offset before it stopped for the weekend. The new offset is much smaller, it was like reset to very low numbers for all of the four partitions.

We don't know what happened in the consumer side as it is not calling poll method, so no log message was printed. We checked kafka server log, but we found nothing.

Does anyone see this before? Would the consumer be misconfigured?

    <spring.kafka.version>1.1.2.RELEASE</spring.kafka.version>
     ... 
  <bean id="defaultKafkaConsumer"
        class="org.apache.kafka.clients.consumer.KafkaConsumer">
        <constructor-arg>  
            <map>
                <entry key="bootstrap.servers"  value="${kafka.bootstrap.servers}"></entry>
                <entry key="max.block.ms"  value="5000"></entry>
                <entry key="group.id"  value="kafkaconnect.tca"></entry>
                <entry key="auto.offset.reset" value="earliest"></entry>
                <entry key="key.deserializer"  value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
                <entry key="value.deserializer"  value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
            </map>  
        </constructor-arg>  
    </bean>

    getKafkaConsumer().subscribe(Arrays.asList(getKafkaTopic()));
            // set up the polling task
            handler = timer.scheduleAtFixedRate(new Runnable() {
                public void run() {
                    try {
                        processPoll();
                    } catch (Throwable t) {
                        LOG.error(String.format("error processing poll for inet: %s, details: %s - %s", getId(), t.getMessage(), t.getCause()), t);
                    }
                }
            }, 3, 3, TimeUnit.MILLISECONDS);



   processPoll() Method: destination will not be ready during the weekend.
   try {
            if (!isDestinationReady()) {
                if (destinationIgnoreCnt++ ==0) {
                    LOG.warn(String.format("outbound destination session is not ready - trying: %s/%s",destinationIgnoreCnt,destinationwaitingloop));
                } else if ((destinationIgnoreCnt++ % destinationwaitingloop) == 0) {
                    LOG.warn(String.format("outbound destination session is not ready - trying %s/%s", destinationIgnoreCnt,destinationwaitingloop));
                    destinationIgnoreCnt = 1;
                }
                messageIgnoreCnt = 0;
                return;
            }
            if(!isDestinationOpen()) {
                if (destinationIgnoreCnt++ ==0) {
                    LOG.error(String.format("outbound destination is not opended - trying:%s/%s.", destinationIgnoreCnt,destinationwaitingloop) );
                } else if ((destinationIgnoreCnt++ % destinationwaitingloop) == 0) {
                    LOG.error(String.format("outbound destination is not opended - trying %s/%s.", destinationIgnoreCnt,destinationwaitingloop));
                    destinationIgnoreCnt = 1;
                }
                messageIgnoreCnt = 0;
                return;
            }

            if (messageIgnoreCnt++ == 0) {
                LOG.info(String.format("kafka poller started. poll interval %s wait: %s", pollingInterval, 60000));
            } else if ((messageIgnoreCnt++ % 30) == 0) {// approximately 30mins
                LOG.info(String.format("kafka poller started. poll interval %s wait %s", pollingInterval, 60000));
                messageIgnoreCnt = 1;
            }

            if (getKafkaConsumer() == null) {
                LOG.critical("KafkaListener consumer is null");
                return;
            }

            ConsumerRecords<String, String> records = getKafkaConsumer().poll(60000);
            if (records == null || records.isEmpty()) {
                LOG.debug("zero records received from Kafka");
                return;
            }
            for (ConsumerRecord<String, String> record : records) {
                LOG.info(String.format("consuming from topic =  %s ", record.toString()));
                try {
                    String jsonMsg = record.value();

                    DirectBatchRequest payload = JacksonUtilities.getObjectMapper().readValue(jsonMsg, DirectBatchRequest.class);

                    if (payload != null) {
                        LOG.info(String.format("Got it reportId:%s", payload.getDestinationId()));
                        if(payload.getDestinationId() == 0) {
                            LOG.info(String.format("Applying default destination desk:%s", defaultDeskId));
                            payload.setDestinationId(defaultDeskId);
                        }
                        List<RequestEntryType> requestEntryTypeList = ((StreamDirectRequest) payload).getRequestList();
                        LOG.info(String.format("Processing size: %s" , requestEntryTypeList.size()) );
                        processRequest((StreamDirectRequest) payload);  //async call
                        LOG.info(String.format("Processing size: %s sent to Steam" , requestEntryTypeList.size()) );    
                    }
                } catch (Throwable t) {
                    LOG.error(String.format("KafkaListener JSON%s conversion error %s", record, t.getMessage()));
                }

            }

        } catch (Throwable t) {
            LOG.error(String.format("KafkaListener exception %s", t.getMessage()));

        }
1
There isn't enough information here. Please post the relevant code from the consumer.kellanburket
Code pieces are addediampolo
Do you have documentation for this getKafkaConsumer method?kellanburket
It is just getting the injected KakfaConsumer Bean: public KafkaConsumer<String, String> getKafkaConsumer() { return kafkaConsumer; }iampolo
Is this program running at all times or does it shut down when idle?kellanburket

1 Answers

1
votes

Kafka by default removes the offsets after offsets.retention.minutes if there is no activity from the consumer group. The default retention period is 1440 minutes(1 day).

In your case, since the consumer-group is down for the weekend, offset is reset.

See https://kafka.apache.org/documentation/#brokerconfigs