1
votes

My node application uses the kafka-node node module.

I have a kafka topic with three partitions as seen below:

Topic: NotifierTemporarye3df:/opPartitionCount: 3in$ kafReplicationFactor: 3ibe Configs: segment.bytes=1073741824 --topic NotifierTemporary
    Topic: NotifierTemporary        Partition: 0    Leader: 1001    Replicas: 1001,1003,1002        Isr: 1001,1003,1002
    Topic: NotifierTemporary        Partition: 1    Leader: 1002    Replicas: 1002,1001,1003        Isr: 1002,1001,1003
    Topic: NotifierTemporary        Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1002,1001

When I write a series of keyed messages to my topic, they all appear to be written to the same partition. I would expect some of my different keyed messages to be sent to partitions 1 and 2.

Here is my log output from the consumer onMessage event function for several messages:

the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":345,"partition":0,"highWaterOffset":346,"key":"66","timestamp":"2020-03-19T00:16:57.783Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":346,"partition":0,"highWaterOffset":347,"key":"222","timestamp":"2020-03-19T00:16:57.786Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":347,"partition":0,"highWaterOffset":348,"key":"13","timestamp":"2020-03-19T00:16:57.791Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":348,"partition":0,"highWaterOffset":349,"key":"316","timestamp":"2020-03-19T00:16:57.798Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":446,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":349,"partition":0,"highWaterOffset":350,"key":"446","timestamp":"2020-03-19T00:16:57.806Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":350,"partition":0,"highWaterOffset":351,"key":"66","timestamp":"2020-03-19T00:17:27.918Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":351,"partition":0,"highWaterOffset":352,"key":"222","timestamp":"2020-03-19T00:17:27.920Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":352,"partition":0,"highWaterOffset":353,"key":"13","timestamp":"2020-03-19T00:17:27.929Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":353,"partition":0,"highWaterOffset":354,"key":"316","timestamp":"2020-03-19T00:17:27.936Z"}

Here is the kafka-node producer code to send a message:

  * @description Adds a notification message to the Kafka topic that is not saved in a database.
  * @param {Int} recipientId - accountId of recipient of notification message
  * @param {Object} message - message payload to send to recipient
  */
  async sendTemporaryNotification(recipientId, subject, message) {
    const notificationMessage = {
      recipient: recipientId,
      subject,
      message,
    };
    // we need to validate this message schema - this will throw if invalid
    Joi.assert(notificationMessage, NotificationMessage);
    // partition based on the recipient
    const payloads = [
      { topic: KAFKA_TOPIC_TEMPORARY, messages: JSON.stringify(notificationMessage), key: notificationMessage.recipient },
    ];
    if (this.isReady) {
      await this.producer.sendAsync(payloads);
    }
    else {
      throw new ProducerNotReadyError('Notifier Producer not ready');
    }
  }
}

As you can see, none of them are ever from partitions 1 & 2. This is true even after constantly sending messages with random integer keys for several minutes. What could I be doing wrong?

2
Can you debug into producer library and see what partitioner strategy was used?Tuyen Luong

2 Answers

2
votes

The correct partitionerType needs to be configured when you create the producer:

// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
new Producer(client, {paritionerType: 3});

See the docs: https://www.npmjs.com/package/kafka-node#producerkafkaclient-options-custompartitioner

1
votes

Scarysize was correct about me not specifying the partitioner type. For anyone wondering what a complete partitioned producer looks like, you can reference this code. I've verified that this distributes messages based off of the provided keys. I used a HighLevelProducer here because one of the main contributors of the kafka-node library suggested that others use it in order to solve partitioning issues. I have not verified that this solution would work with a regular Producer rather than the HighLevelProducer.

In this example, I'm sending notification messages to users based off of their userId. That is the key which messages are being partitioned on.

const { KafkaClient, HighLevelProducer, KeyedMessage } = require('kafka-node');
const Promise = require('bluebird');
const NotificationMessage = require(__dirname + '/../models/notificationMessage.js');
const ProducerNotReadyError = require(__dirname + '/../errors/producerNotReadyError.js');
const Joi = require('@hapi/joi');

const KAFKA_TOPIC_TEMPORARY = 'NotifierTemporary';

/**
 * @classdesc Producer that sends notification messages to Kafka.
 * @class
 */
class NotifierProducer {

  /**
  * Create NotifierProducer.
  * @constructor
  * @param {String} kafkaHost - address of kafka server
  */
  constructor(kafkaHost) {
    const client = Promise.promisifyAll(new KafkaClient({kafkaHost}));
    const producerOptions = {
      partitionerType: HighLevelProducer.PARTITIONER_TYPES.keyed, // this is a keyed partitioner
    };
    this.producer = Promise.promisifyAll(new HighLevelProducer(client, producerOptions));
    this.isReady = false;

    this.producer.on('ready', async () => {
      await client.refreshMetadataAsync([KAFKA_TOPIC_TEMPORARY]);
      console.log('Notifier Producer is operational');
      this.isReady = true;
    });

    this.producer.on('error', err => {
      console.error('Notifier Producer error: ', err);
      this.isReady = false;
    });
  }
  /**
  * @description Adds a notification message to the Kafka topic that is not saved in a database.
  * @param {Int} recipientId - accountId of recipient of notification message
  * @param {String} subject - subject header of the message
  * @param {Object} message - message payload to send to recipient
  */
  async sendTemporaryNotification(recipientId, subject, message) {
    const notificationMessage = {
      recipient: recipientId,
      subject,
      message,
    };
    // we need to validate this message schema - this will throw if invalid
    Joi.assert(notificationMessage, NotificationMessage);
    // partition based on the recipient
    const messageKM = new KeyedMessage(notificationMessage.recipient, JSON.stringify(notificationMessage));
    const payloads = [
      { topic: KAFKA_TOPIC_TEMPORARY, messages: messageKM, key: notificationMessage.recipient },
    ];
    if (this.isReady) {
      await this.producer.sendAsync(payloads);
    }
    else {
      throw new ProducerNotReadyError('Notifier Producer not ready');
    }
  }
}

/**
 * Kafka topic that the producer and corresponding consumer will use to send temporary messages.
 * @type {string}
*/
NotifierProducer.KAFKA_TOPIC_TEMPORARY = KAFKA_TOPIC_TEMPORARY;

module.exports = NotifierProducer;