0
votes

I am attempting to use kafka-node to read compacted messages from a kafka topic.

The problem is that recently inserted messages are left above the EOL and are not reachable until additional messages are inserted. Effectively there is a gap between the EOL and the High Water Offset which prevents reading of the latest messages. Its not clear why this is.

A topic has been created with

kafka-topics.sh --zookeeper ${KAFKA_HOST}:2181 --create --topic atopic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0" --partitions 1 --replication-factor 1

A number of key values are produced into the topic. Some of the keys were the same.

var client = new kafka.KafkaClient({kafkaHost: "<host:port>",autoConnect: true})
var producer = new HighLevelProducer(client);
  producer.send(payload, function(error, result) {
  debug('Sent payload to Kafka: ', payload);
  if (error) {
    console.error(error);
  } else {
   res(true)
  }
  client.close()
 });
});

Here are the keys and values inserted

key - 1
key2 - 1
key3 - 1
key - 2
key2 - 2
key3 - 2
key1 - 3
key - 3
key2 - 3
key3 - 3

Then the set of topic keys was requested.

var options = {
        id: 'consumer1',
        kafkaHost: "<host:port>",
        groupId: "consumergroup1",
        sessionTimeout: 15000,
        protocol: ['roundrobin'],
        fromOffset: 'earliest'
      };
      var consumerGroup = new ConsumerGroup(options, topic);
        consumerGroup.on('error', onError);
        consumerGroup.on('message', onMessage);
        consumerGroup.on('done', function(message) {
          consumerGroup.close(true,function(){ });
        })
        function onError (error) {
          console.error(error);
        }
        function onMessage (message) {)
            console.log('%s read msg Topic="%s" Partition=%s Offset=%d HW=%d', this.client.clientId, message.topic, message.partition, message.offset, message.highWaterOffset, message.value);
        }
      })
The results are surprising:
consumer1 read msg Topic="atopic" Partition=0 Offset=4 highWaterOffset=10 Key=key2 value={"name":"key2","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=5 highWaterOffset=10 Key=key3 value={"name":"key3","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=6 highWaterOffset=10 Key=key1 value={"name":"key1","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=7 highWaterOffset=10 Key=key value={"name":"key","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=

There is a high water offset which represents the latest value of 10. However the offset value the consumer sees is only 7. Somehow the compaction prevents the consumer from seeing the latest messages.

Its not clear how to avoid this constraint and allow the consumer to see the latest messages.

Any suggestions appreciated. Thanks.

2
Your code doesn't match the output... Does the console consumer also show this problem?OneCricketeer
I do get the same problem using the Consumer rather than the ConsumerGroup (if that is your question). @cricket_007Phil Tomlinson
No... Your code says HW=%d , and your output says highWaterOffset. If you have questions about the output of the code, it would make sense if you showed the code that generated it. In any case, the topic is compacted, and so you get to offset 10 by the producer, but there are only 7 values on the topic itself.OneCricketeer

2 Answers

0
votes

Somehow the compaction prevents the consumer from seeing the latest messages.

Yes, you are missing a few messages, but you are also seeing others.

Compaction is removing the earlier keys.

Notice how there are no url - 1 values at all

Key=key2 value={"name":"key2","url":"2"}
Key=key3 value={"name":"key3","url":"2"}
Key=key1 value={"name":"key1","url":"3"}
Key=key value={"name":"key","url":"3"}

That is because you sent new values for the same key.

And you sent 10 messages, so the high water offset for the topic is 10

Your code doesn't necessarily look wrong, but you should have two more 3 values. The offsets that get printed correspond to this logic.

key  - 1 | 0
key2 - 1 | 1
key3 - 1 | 2
key  - 2 | 3
key2 - 2 | 4
key3 - 2 | 5
key1 - 3 | 6
key  - 3 | 7
key2 - 3 | 8
key3 - 3 | 9

Generally, I would suggest not having Kafka try to compact the topic and write log segments 10x a second, as well as using different libraries such as node-rdkafka

0
votes

After working a bit more with kafka it seems that the kafka-node api has the following behaviour (which I think actually derives from kafka itself).

When messages are queried before the highWaterOff then only messages up to the highWaterOffset are returned to the ConsumerGroup. This makes sense if the messages have not been replicated because another consumer in the group would not necessarily see these messages.

It is still possible to request and receive messages beyond the highWaterOffset using a Consumer rather than a ConsumerGroup and by querying a specific partition.

Also the 'done' event seems to get fired when the offset is not necessarily at the latestOffset. In this case it is necessary to submit a further query at message.offset+1. If you continue to do this you can get all messages up to the latestOffset.

It is not clear to my why kafka has this behaviour but there is a probably some lower level detail that surfaces this emergent behaviour.