I've been looking through several examples of the Confluent.Kafka client (https://github.com/confluentinc/confluent-kafka-dotnet/), and whilst I can successfully get a producer to push a message into Kafka, I'm unable to pull any messages back down with consumers.
Through the UI I can see the topic is created and messages are going into this topic (there are currently 10 partitions and 3 messages), but my consumer always reports "end of partition", without any consumption of any message (the 3 remain on the topic and "OnMessage" never fires).
However the consumer is definitely accessing the topics, and can see 3 messages on one of the partitions:
end of partition: dotnet-test-topic [6] @3
It just doesn't consume the message and trigger OnMessage(). Any ideas?
var conf = new Dictionary<string, object>
{ "group.id", Guid.NewGuid().ToString() },
{ "bootstrap.servers", "mykafkacluster:9094" },
{ "sasl.mechanisms", "SCRAM-SHA-256" },
{ "security.protocol", "SASL_SSL" },
{ "sasl.username", "myuser" },
{ "sasl.password", "mypass" }
using (var producer = new Producer<string, string>(conf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
producer.ProduceAsync("dotnet-test-topic", "some key", "some value")
.ContinueWith(result =>
var msg = result.Result;
if (msg.Error.Code != ErrorCode.NoError)
Console.WriteLine($"failed to deliver message: {msg.Error.Reason}");
Console.WriteLine($"delivered to: {result.Result.TopicPartitionOffset}");
using (var consumer = new Consumer<string, string>(conf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
consumer.OnConsumeError += (_, err)
=> Console.WriteLine($"consume error: {err.Error.Reason}");
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"consumed: {msg.Value}");
consumer.OnPartitionEOF += (_, tpo)
=> Console.WriteLine($"end of partition: {tpo}");
while (true)