0
votes

sorry if this is clear in the documentation but I have not found it. I'm currently following this tutorial and adapting it to my needs and I'm facing some issues and I'm wondering if this would be the correct fit for me.

https://medium.com/@tomershaiman/tutorial-building-a-s3-parquet-datalake-without-a-single-line-of-code-5ea4455edc1e What I want to do is very very similar to that imagine the situation I have.

TL/DR: I have data in Kafka in a topic in Protobuf format, I want to create a stream that converts that protobuf to a new topic in Avro format and once it's in avro format I'll have a connector that will consume it and dump it into an s3 bucket.

Now imagine I have a Kafka topic SearchRequest_proto in the format protobuf then I want to create a topic called SearchRequest_avro in the format avro

for example my protobuf would be

syntax = "proto3";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

my first question is does my SearchRequest ksql stream require all the fields or could I do something just something like this?

CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT, result_per_page INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');

Or can I have it something like this:

CREATE STREAM SearchRequest_proto (query VARCHAR, page_number INT) WITH (KAFKA_TOPIC='SearchRequest_proto', VALUE_FORMAT='PROTOBUF');

My question comes because I have a bit of a more complex proto and I'm trying for testing just using some of the fields, to not need to do all of them, and when I create my second stream out of the first one nothing really seems to come out.

CREATE STREAM SearchRequest_avro WITH (KAFKA_TOPIC='SearchRequest_avro', REPLICAS=1, PARTITIONS=1, VALUE_FORMAT='AVRO') AS SELECT * FROM SearchRequest_proto;

Also afterwards if I go in my kafka consumer groups I can see the second stream registered as a consumer in kafka. My first topic with the protobuf contains messages, but somehow I can't even use the print on my topics nowthing is shown, I get this message:

ksql> show streams;

 Stream Name   | Kafka Topic                 | Format   
--------------------------------------------------------
 OBJ_POS_AVRO  | com.obj_pos_avro            | AVRO     
 OBJ_POS_PROTO | com.obj_pos_proto           | PROTOBUF 
--------------------------------------------------------
ksql> print "com.obj_pos_proto";
Could not find topic 'com.obj_pos_proto', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.
ksql> print "com.obj_pos_avro";
Could not find topic 'com.obj_pos_avro', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive.

My question comes because since I see the consumer registered but without any offset I'm wondering because I did not implicitly declare all the fiels in my protobuf as part of the stream if it's failing because of that? Or maybe it's something else.

Also extra point, if anyone would know, I did some google but did not find, is there any way I could use the schema registry to register my protobufs so that the streams would automatically be able to read it without needing to specify all those fields?

Or maybe any kind of library that can consumer protobufs and generate stream or avro format files?

thanks for any feedback, sorry for the long post, also as you can imagine I'm definitely not very knowledgeable in this kafka topics so it's kind of new to me

EDIT: I did a quick test on my own and indeed it does support just having less fields, so this would not be the problem. I'm however getting an error with the serialization de-serialization that must be some configuration issue on my side:

[2020-09-21 08:19:32,836] INFO KafkaProtobufDeserializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://confluent-schema-registry-svc:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    specific.protobuf.value.type = class java.lang.Object
    use.latest.version = false
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    derive.type = false
    specific.protobuf.key.type = class java.lang.Object
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig)
[2020-09-21 08:19:32,836] INFO ProtobufDataConfig values: 
    schemas.cache.config = 1000
    enhanced.protobuf.schema.support = false
 (io.confluent.connect.protobuf.ProtobufDataConfig)
[2020-09-21 08:19:32,841] INFO JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
 (org.apache.kafka.connect.json.JsonConverterConfig)
[2020-09-21 08:19:32,844] ERROR {"type":0,"deserializationError":{"errorMessage":"Error deserializing message from topic: com.obj_pos_proto","recordB64":null,"cause":["Failed to deserialize data for topic com.obj_pos_proto to Protobuf: ","Error deserializing Protobuf message for id -1","Unknown magic byte!"]},"recordProcessingError":null,"productionError":null} (processing.CSAS_TARGET_AVRO_3.KsqlTopic.Source.deserializer)
[2020-09-21 08:19:32,845] WARN Exception caught during Deserialization, taskId: 0_2, topic: com.obj_pos_proto, partition: 2, offset: 0 (org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: com.obj_pos_proto
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos_proto to Protobuf: 
    at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
    at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
    at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:300)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:285)
    at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:46)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
1

1 Answers

0
votes

I believe I found the reason why I was having this problem.

What I wanted to do was to create a keyless stream https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/#example

-- keyless stream, with value columns loaded from Schema Registry:
CREATE STREAM pageviews WITH (
    KAFKA_TOPIC = 'keyless-pageviews-topic',
    VALUE_FORMAT = 'JSON'
  );

the reason why this was failing was that my producer was not contacting to my schema registry so when I tried to deserialize the data it would always fail because the registry was not really working properly