2
votes

I am using kafka : kafka_2.12-2.1.0, spring kafka on client side and have got stuck with an issue.

I need to load an in-memory map by reading all the existing messages within a kafka topic. I did this by starting a new consumer (with a unique consumer group id and setting the offset to earliest). Then I iterate over the consumer (poll method) to get all messages and stop when the consumer records become empty.

But I noticed that, when I start polling, the first few iterations return consumer records as empty and then it starts returning the actual records. Now this breaks my logic as our code thinks there are no records in the topic.

I have tried few other ways (like using offsets number) but haven't been able to come up with any solution, apart from keeping another record somewhere which tells me how many messages there are in the topic which needs to be read before I stop.

Any idea's please ?

3

3 Answers

2
votes

To my understanding, what you are trying to achieve is to have a map constructed in your application based on the values that are already in a specific Topic.

For this task, instead of manually polling the topic, you can use Ktable in Kafka Streams DSL which will automatically construct a readable key-value store which is fault tolerant, replication enabled and automatically filled with new values.

You can do this simply by calling groupByKey on a stream and then using the aggregate.

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> myKStream = builder.stream(Serdes.String(), Serdes.Long(), "topic_name");
KTable<String, Long> totalCount = myKStream.groupByKey().aggregate(this::initializer, this::aggregator);

(The actual code may vary depending on the kafka version, your configurations, etc..)

Read more about Kafka Stream concepts here

Then I iterate over the consumer (poll method) to get all messages and stop when the consumer records become empty

Kafka is a message streaming platform. Any data you stream is being updated continuously and you probably should not use it in a way that you expect the consuming to stop after a certain number of messages. How will you handle if a new message comes in after you stop the consumer?

Also the reason you are getting null records maybe probably related to records being in different partitions, etc..

What is your specific use case here?, There might be a good way to do it with the Kafka semantics itself.

0
votes

You have to use 2 consumers one to load the offsets and another one to read all the records.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaRecordReader {

    static final Map<String, Object> props = new HashMap<>();
    static {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-client");
    }

    public static void main(String[] args) {
        final Map<TopicPartition, OffsetInfo> partitionOffsetInfos = getOffsets(Arrays.asList("world, sample"));
        final List<ConsumerRecord<byte[], byte[]>> records = readRecords(partitionOffsetInfos);

        System.out.println(partitionOffsetInfos);
        System.out.println("Read : " + records.size() + " records");
    }

    private static List<ConsumerRecord<byte[], byte[]>> readRecords(final Map<TopicPartition, OffsetInfo> offsetInfos) {
        final Properties readerProps = new Properties();
        readerProps.putAll(props);
        readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");

        final Map<TopicPartition, Boolean> partitionToReadStatusMap = new HashMap<>();
        offsetInfos.forEach((tp, offsetInfo) -> {
            partitionToReadStatusMap.put(tp, offsetInfo.beginOffset == offsetInfo.endOffset);
        });

        final List<ConsumerRecord<byte[], byte[]>> cachedRecords = new ArrayList<>();
        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(readerProps)) {
            consumer.assign(offsetInfos.keySet());
            for (final Map.Entry<TopicPartition, OffsetInfo> entry : offsetInfos.entrySet()) {
                consumer.seek(entry.getKey(), entry.getValue().beginOffset);
            }

            boolean close = false;
            while (!close) {
                final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                    cachedRecords.add(record);
                    final TopicPartition currentTp = new TopicPartition(record.topic(), record.partition());
                    if (record.offset() + 1 == offsetInfos.get(currentTp).endOffset) {
                        partitionToReadStatusMap.put(currentTp, true);
                    }
                }

                boolean done = true;
                for (final Map.Entry<TopicPartition, Boolean> entry : partitionToReadStatusMap.entrySet()) {
                    done &= entry.getValue();
                }
                close = done;
            }
        }
        return cachedRecords;
    }

    private static Map<TopicPartition, OffsetInfo> getOffsets(final List<String> topics) {
        final Properties offsetReaderProps = new Properties();
        offsetReaderProps.putAll(props);
        offsetReaderProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset-reader");

        final Map<TopicPartition, OffsetInfo> partitionOffsetInfo = new HashMap<>();
        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(offsetReaderProps)) {
            final List<PartitionInfo> partitionInfos = new ArrayList<>();
            topics.forEach(topic -> partitionInfos.addAll(consumer.partitionsFor("sample")));
            final Set<TopicPartition> topicPartitions = partitionInfos
                    .stream()
                    .map(x -> new TopicPartition(x.topic(), x.partition()))
                    .collect(Collectors.toSet());
            consumer.assign(topicPartitions);
            final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
            final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

            for (final TopicPartition tp : topicPartitions) {
                partitionOffsetInfo.put(tp, new OffsetInfo(beginningOffsets.get(tp), endOffsets.get(tp)));
            }
        }
        return partitionOffsetInfo;
    }

    private static class OffsetInfo {

        private final long beginOffset;
        private final long endOffset;

        private OffsetInfo(long beginOffset, long endOffset) {
            this.beginOffset = beginOffset;
            this.endOffset = endOffset;
        }

        @Override
        public String toString() {
            return "OffsetInfo{" +
                    "beginOffset=" + beginOffset +
                    ", endOffset=" + endOffset +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            OffsetInfo that = (OffsetInfo) o;
            return beginOffset == that.beginOffset &&
                    endOffset == that.endOffset;
        }

        @Override
        public int hashCode() {
            return Objects.hash(beginOffset, endOffset);
        }
    }
}
0
votes

Adding to the above answer from @arshad, the reason you are not getting the records is because you have already read them. See this answer here using earliest or latest does not matter on the consumer after you have a committed offset for the partition

I would use a seek to the beginning or the particular offset if you knew the starting offset.