0
votes

I am new to Kafka and somehow able to run kafka Avro consumer and Producer. Producer is producing the message and i am successfully getting it in consumer. Here is my producer code snippet:

static async void AvroProducer()
{
    string bootstrapServers = "localhost:9092";
    string schemaRegistryUrl = "Production163:8081"; 
    string topicName = "player";
    string groupName = "avro-generic-example-group";


     var s = (RecordSchema)RecordSchema.Parse(
        @"{
            ""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
            ""type"": ""record"",
            ""name"": ""User"",
            ""fields"": [
                {""name"": ""name"", ""type"": ""string""},
                {""name"": ""favorite_number"",  ""type"": [""int"", ""null""]},
                {""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
            ]
          }"
    );

    using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
    using (var producer =
        new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
            .SetKeySerializer(new AsyncAvroSerializer<string>(schemaRegistry))
            .SetValueSerializer(new AsyncAvroSerializer<GenericRecord>(schemaRegistry))
            .Build())
    {
        Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

        int i = 0;
        string text;
        while ((text = Console.ReadLine()) != "q")
        {
            var record = new GenericRecord(s);
            record.Add("name", text);
            record.Add("favorite_number", i++);
            record.Add("favorite_color", "blue");

            producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                .ContinueWith(task => task.IsFaulted
                    ? $"error producing message: {task.Exception.Message}"
                    : $"produced to: {task.Result.TopicPartitionOffset}");
        }
    }
    Console.ReadLine();

}

As you can see in above code i am using a record scheme, But I am trying this scehema:

//this is the new schema try
        var s = (RecordSchema)RecordSchema.Parse(
            @"{
                ""type"": ""record"",
                ""name"": ""TestingMsg"",
                ""doc"": ""Sample"",
                ""fields"": [
                  {
                   ""name"": ""key"",
                   ""type"": ""string""
                  },
                  {
                   ""name"": ""Time"",
                   ""type"": ""long""
                  },
                  {
                   ""name"": ""sourceSeconds"",
                   ""type"": ""long""
                  },
                  {
                   ""name"": ""serverT"",
                   ""type"": ""long""
                  },

                  {
                   ""name"": ""statusCode"",
                   ""type"": ""int""
                  }
                ]
                }"
            );

The new one which I am trying to use, but it not working as I am not getting messages in consumer. And here is the consumer :

void KafkaReader(CancellationToken cancellationToken)
    {
        Debug.Log("kafka reader started. . .");
        // Set up your Kafka connection here.

        while (_keepThreadRunning)
        {
            using (CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
            using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
            //using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers})
                    .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                    .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                    .SetErrorHandler((_, e) => Debug.Log($"Error: {e.Reason}"))
                    .Build())
            {
                Debug.Log("subscribe" );
                consumer.Subscribe(topicName);


                while (true)
                {
                    ConsumeResult<string, GenericRecord> consumeResult = consumer.Consume(cancellationToken);//TimeSpan.FromMilliseconds(50000)//new TimeSpan(0,0,1)

                    _stringsReceived.Enqueue(consumeResult.Value.ToString());


                    if (consumeResult != null)
                    {
                        Debug.Log($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");


                    }
                    else
                    {
                        Debug.Log("consumer Result is null");
                    }

                    //yield return new WaitForSeconds(1);
                }
            }


        }

        GetComponent<KafkaServerConfigUI>().KafkaDisconnected();

        // Disconnect and clean up your connection here.


    }

Remember I just running the default apache Kafka registry using a batch file.

D:\ApachKafka\confluent\confluent-5.2.1\bin\windows\schema-registry-start.bat D:\ApachKafka\confluent\confluent-5.2.1\etc\schema-registry\schema-registry.properties

What i am doing wrong? Do i need to register the schema to anywhere?

2

2 Answers

1
votes

For making any change or using new schema, you have to register the schema. I was missing this thing therefore I was not getting the messages in consumer. Here is the short python script that help you to register the schema.

Using the script, You have to provide the URL of the Schema Registry (starting with http://, not just a hostname and port), the topic for which the schema should be registered, and the path to the schema.

Here is the way i registrer my schema

enter image description here

Thank to Ref: Avro and Schema registry

0
votes

I know you have answer for this. Here is my suggestion to avoid running python script for each schema update.

You can use schema-registry-ui.

In a nut-shell the schema-registry-ui provides - Exploring and searching schemas - Avro evolution compatibility checks - New schema registration - Avro + Table schema views - Displaying CURL commands

How to get it

git clone https://github.com/Landoop/schema-registry-ui.git
cd schema-registry-ui
npm install -g bower
npm install
http-server .

Demo

http://schema-registry-ui.landoop.com/

Or docker images are available. If you can opt for license, try confluent control center, which provides even more options.