1
votes

I'm using Nodejs cluster module to have multiple workers running. I created a basic Architecture where there will be a single MASTER process which is basically an express server handling multiple requests and the main task of MASTER would be writing incoming data from requests into a REDIS instance. Other workers(numOfCPUs - 1) will be non-master i.e. they won't be handling any request as they are just the consumers. I have two features namely ABC and DEF. I distributed the non-master workers evenly across features via assigning them type.

For eg: on a 8-core machine:

1 will be MASTER instance handling request via express server

Remaining (8 - 1 = 7) will be distributed evenly. 4 to feature:ABD and 3 to fetaure:DEF.

non-master workers are basically consumers i.e. they read from REDIS in which only MASTER worker can write data.

Here's the code for the same:

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs - 1; i++) {
    ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
  }

  cluster.on('exit', function(worker) {
    console.log(`Worker ${worker.process.pid}::type(${worker.type}) died`);
    ClusteringUtil.removeWorkerFromList(worker.type);
    ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
  });

  // Start consuming on server-start
  ABCConsumer.start();
  DEFConsumer.start();

  console.log(`Master running with process-id: ${process.pid}`);
} else {
  console.log('CLUSTER  type', cluster.worker.process.env.type, 'running on', process.pid);
  if (
    cluster.worker.process.env &&
    cluster.worker.process.env.type &&
    cluster.worker.process.env.type === ServerTypeEnum.EXPRESS
  ) {
    // worker for handling requests
    app.use(express.json());
    ...
  }
{

Everything works fine except consumers reading from REDIS. Since there are multiple consumers of a particular feature, each one reads the same message and start processing individually, which is what I don't want. If there are 4 consumers, 1 is marked as busy and can not consumer until free, 3 are available. Once the message for that particular feature is written in REDIS by MASTER, the problem is all 3 available consumers of that feature start consuming. This means that the for a single message, the job is done based on number of available consumers.

const stringifedData = JSON.stringify(req.body);
  const key = uuidv1();

  const asyncHsetRes = await asyncHset(type, key, stringifedData);

  if (asyncHsetRes) {
    await asyncRpush(FeatureKeyEnum.REDIS.ABC_MESSAGE_QUEUE, key);
    res.send({ status: 'success', message: 'Added to processing queue' });
  } else {
    res.send({ error: 'failure', message: 'Something went wrong in adding to queue' });
  }

Consumer simply accepts messages and stop when it is busy

module.exports.startHeartbeat = startHeartbeat = async function(config = {}) {
  if (!config || !config.type || !config.listKey) {
    return;
  }

  heartbeatIntervalObj[config.type] = setInterval(async () => {
    await asyncLindex(config.listKey, -1).then(async res => {
      if (res) {
        await getFreeWorkerAndDoJob(res, config);
        stopHeartbeat(config);
      }
    });
  }, HEARTBEAT_INTERVAL);
};

Ideally, a message should be read by only one consumer of that particular feature. After consuming, it is marked as busy so it won't consume further until free(I have handled this). Next message could only be processed by only one consumer out of other available consumers.

Please help me in tacking this problem. Again, I want one message to be read by only one free consumer and rest free consumers should wait for new message.

Thanks

1

1 Answers

1
votes

I'm not sure I fully get your Redis consumers architecture, but I feel like it contradicts with the use case of Redis itself. What you're trying to achieve is essentially a queue based messaging with an ability to commit a message once its done.

Redis has its own pub/sub feature, but it is built on fire and forget principle. It doesn't distinguish between consumers - it just sends the data to all of them, assuming that its their logic to handle the incoming data.

I recommend to you use Queue Servers like RabbitMQ. You can achieve your goal with some features that AMQP 0-9-1 supports: message acknowledgment, consumer's prefetch count and so on. You can set up your cluster with very agile configs like ok, I want to have X consumers, and each can handle 1 unique (!) message at a time and they will receive new ones only after they let the server (rabbitmq) know that they successfully finished message processing. This is highly configurable and robust.

However, if you want to go serverless with some fully managed service so that you don't provision like virtual machines or anything else to run a message queue server of your choice, you can use AWS SQS. It has pretty much similar API and features list.

Hope it helps!