I am trying a POC for Kafka in my project and created two console apps in .net core 2.1 using Confluent.kafka library. I have installed Kafka on Ubuntu box and it is running properly. When i am pushing thousands of messages into Kafka using Producer console app and appending a sequence number in the message. When i am consuming those messages in my consumer console app the messages are not in correct order. There is only one Producer and Consumer and they both are tied to one topic. Below is the code of my Producer
public class Kafta
{
private Dictionary<string, object> config;
private string topicName;
public Kafta(string topic)
{
config = new Dictionary<string, object>
{
{"bootstrap.servers","192.168.60.173:9092" }
};
topicName = topic;
}
public async void SendMessageAsync(string message)
{
using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
var msg = await producer.ProduceAsync(topicName, "userid", message);
//producer.ProduceAsync("console", null, message);
producer.Flush(500);
}
}
}
Program.cs static void main of Producer
static void Main(string[] args)
{
string topic = "tester2";
long count = 1;
Console.WriteLine("Starting to send message");
Console.WriteLine("Write the message here: ");
if(args.Length == 2)
{
topic = args[0];
count = long.Parse(args[1]);
}
try
{
Console.WriteLine("Topic name " + topic);
var message = Console.ReadLine();
var service = new Kafta(topic);
for(var i = 0; i<count;i++)
{
var msg = message + " number " + i.ToString();
Console.WriteLine("Message to Kafta: " + msg);
service.SendMessageAsync(msg);
}
}
catch (Exception ex)
{
Console.WriteLine("Exception occured " + ex.Message);
}
finally
{
Console.WriteLine("Press any key to exit");
Console.Read();
}
}
Consumer code
static void Main(string[] args)
{
var config = new Dictionary<string, object>
{
{ "group.id", "sample-consumer" },
{ "bootstrap.servers", "192.168.60.173:9092" },
{ "enable.auto.commit", "false"}
};
string topic = "tester2";
if (args.Length == 1)
topic = args[0];
using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe(new string[] { topic });
consumer.OnMessage += (_, msg) =>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
consumer.CommitAsync(msg);
};
while (true)
{
consumer.Poll(100);
}
}
}
Output from Producer
Message to Kafta: message number 0
Message to Kafta: message number 1
Message to Kafta: message number 2
Message to Kafta: message number 3
Message to Kafta: message number 4
Message to Kafta: message number 5
Message to Kafta: message number 6
Message to Kafta: message number 7
Message to Kafta: message number 8
Message to Kafta: message number 9
Output from Consumer:
message number 4
message number 7
message number 0
message number 1
message number 2
message number 3
message number 5
message number 6
message number 8
message number 9
I am new to Kafka and not sure what i am missing to make it working correctly. As per Kafka documentation ordering of message is guaranteed for my use case so there must be some silly mistake i am doing and unable to figure it out.
Is there any other alternative to Kafka which i can use?
Thanks