0
votes

I am trying a POC for Kafka in my project and created two console apps in .net core 2.1 using Confluent.kafka library. I have installed Kafka on Ubuntu box and it is running properly. When i am pushing thousands of messages into Kafka using Producer console app and appending a sequence number in the message. When i am consuming those messages in my consumer console app the messages are not in correct order. There is only one Producer and Consumer and they both are tied to one topic. Below is the code of my Producer

public class Kafta
{
    private Dictionary<string, object> config;
    private string topicName;

    public Kafta(string topic)
    {
        config = new Dictionary<string, object>
        {
            {"bootstrap.servers","192.168.60.173:9092" }
        };
        topicName = topic;
    }
    public async void SendMessageAsync(string message)
    {
        using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
        {
            var msg = await producer.ProduceAsync(topicName, "userid", message);

            //producer.ProduceAsync("console", null, message);
            producer.Flush(500);
        }
    }
}

Program.cs static void main of Producer

static void Main(string[] args)
    {
        string topic = "tester2";
        long count = 1;
        Console.WriteLine("Starting to send message");
        Console.WriteLine("Write the message here: ");

        if(args.Length == 2)
        {
            topic = args[0];
            count = long.Parse(args[1]);
        }
        try
        {
            Console.WriteLine("Topic name " + topic);
            var message = Console.ReadLine();            
            var service = new Kafta(topic);

            for(var i = 0; i<count;i++)
            {
                var msg = message + " number " + i.ToString();
                Console.WriteLine("Message to Kafta: " + msg);
                service.SendMessageAsync(msg);
            }

        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception occured " + ex.Message);
        }
        finally
        {
            Console.WriteLine("Press any key to exit");
            Console.Read();
        }
    }

Consumer code

static void Main(string[] args)
    {
        var config = new Dictionary<string, object>
        {
          { "group.id", "sample-consumer" },
          { "bootstrap.servers", "192.168.60.173:9092" },
          { "enable.auto.commit", "false"}
        };
        string topic = "tester2";
        if (args.Length == 1)
            topic = args[0];
        using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
        {
            consumer.Subscribe(new string[] { topic });                
            consumer.OnMessage += (_, msg) =>
            {
                Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
                consumer.CommitAsync(msg);

            };

            while (true)
            {
                consumer.Poll(100);
            }
        }
    }

Output from Producer

Message to Kafta: message number 0
Message to Kafta: message number 1
Message to Kafta: message number 2
Message to Kafta: message number 3
Message to Kafta: message number 4
Message to Kafta: message number 5
Message to Kafta: message number 6
Message to Kafta: message number 7
Message to Kafta: message number 8
Message to Kafta: message number 9

Output from Consumer:

message number 4
message number 7
message number 0
message number 1
message number 2
message number 3
message number 5
message number 6
message number 8
message number 9

I am new to Kafka and not sure what i am missing to make it working correctly. As per Kafka documentation ordering of message is guaranteed for my use case so there must be some silly mistake i am doing and unable to figure it out.

Is there any other alternative to Kafka which i can use?

Thanks

2
Not clear how many partitions your topic hasOneCricketeer

2 Answers

2
votes

As per Kafka documentation ordering of message is guaranteed

Only per partition. From your question, you didn't mention how many partitions your topic has. You're printing Topic: {msg.Topic} Partition: {msg.Partition}, but that's not the output of your post..

In your producer, you're doing "fire and forget" with SendMessageAsync and not verifying the broker actually received the message with the return value of that method. So that's one possibility - your print statement there will be in order, but the messages aren't necessarily reaching the broker that way.

If the partition number is always the same in the consumer output shown in the code, while I'm not familiar with C# API, it looks like you're using a non-blocking consumer message listener. That OnMessage function is likely getting invoked in a separate thread, which doesn't necessarily write to the standard output in a guaranteed order. A better test might be to insert a timestamp with each message rather than only a counter

Is there any other alternative to Kafka which i can use?

Plenty of other MQ technologies exist, such as RabbitMQ, so if you don't care about the persistence features and other APIs (Streams and Connect) of Kafka, feel free to use those

0
votes

As @cricket_007 mentioned, having one topic and multiple partitions means, that only data received from one partition are ordered.

When you create a consumer (only one), it takes all partitions to read the messages from. Then, data from partitions are red SYNCHRONOUSLY (yes) but the partition you receive message from changes at time.

Let's say you produced 100 messages to topic with 4 partitions. For the sake of brevity say each partition stored 25 messages. When you start the consumer, it then receives messages like this (example):

  • 5 messages from partition #1
  • 4 messages from partition #2
  • 6 messages from partition #3
  • 2 messages from partition #4
  • 3 messages from partition #1
  • ...

That is because consumer tries to read all partitions evenly.