0
votes

I've only seen one thread containing information about the topic I've mentioned which is : How to Deserialising Kafka AVRO messages using Apache Beam

However, after trying a few variations of kafkaserializers I still cannot deserialize kafka messages. Here's my code:

public class Readkafka {
    private static final Logger LOG = LoggerFactory.getLogger(Readkafka.class);

    public static void main(String[] args) throws IOException {
        // Create the Pipeline object with the options we defined above.
        Pipeline p = Pipeline.create(
                PipelineOptionsFactory.fromArgs(args).withValidation().create());
       PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
                KafkaIO.<action_states_pkey, String>read()
                    .withBootstrapServers("mybootstrapserver")
                    .withTopic("action_States")
                    .withKeyDeserializer(MyClassKafkaAvroDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistryurl"))
                    .withMaxNumRecords(5)
                    .withoutMetadata();


        p.apply(kafka)
            .apply(Keys.<action_states_pkey>create())
}

where MyClassKafkaAvroDeserilizer is

public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<action_states_pkey> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public action_states_pkey deserialize(String s, byte[] bytes) {
    return (action_states_pkey) this.deserialize(bytes);
}

@Override
public void close() {} }

and the class action_states_pkey is code generated from avro tools using

java -jar pathtoavrotools/avro-tools-1.8.1.jar compile schema pathtoschema/action_states_pkey.avsc destination path

where the action_states_pkey.avsc is literally

{"type":"record","name":"action_states_pkey","namespace":"namespace","fields":[{"name":"ad_id","type":["null","int"]},{"name":"action_id","type":["null","int"]},{"name":"state_id","type":["null","int"]}]}

With this code I'm getting the error :

Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
    at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:20)
    at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:1)
    at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:221)
    at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.advanceWithBackoff(BoundedReadFromUnboundedSource.java:279)
    at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.start(BoundedReadFromUnboundedSource.java:256)
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:592)
    ... 14 more

It seems there's an error in trying to map the Avro Data to my custom class ?

Alternatively, I've tried the following code :

        PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
                KafkaIO.<action_states_pkey, String>read()
                    .withBootstrapServers("bootstrapserver")
                    .withTopic("action_states")
                    .withKeyDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(action_states_pkey.class))
                    .withValueDeserializer(StringDeserializer.class)
                    .updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistry"))
                    .withMaxNumRecords(5)
                    .withoutMetadata();


        p.apply(kafka);
            .apply(Keys.<action_states_pkey>create())
//            .apply("ExtractWords", ParDo.of(new DoFn<action_states_pkey, String>() {
//                @ProcessElement
//                public void processElement(ProcessContext c) {
//                  action_states_pkey key = c.element();
//                    c.output(key.getAdId().toString());
//                }
//            }));

which does not give me any error until i try to print out the data. I have to verify that I'm succesfully reading the data one way or another so my intent here is to log the data in the console. If I uncomment the commented section i get the same error once again:

SEVERE: 2019-09-13T07:53:56.168Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
    at my.mudah.beam.test.Readkafka$1.processElement(Readkafka.java:151)

Another thing to note is that if I specify :

.updateConsumerProperties(ImmutableMap.of("specific.avro.reader", (Object)"true"))

always gives me an error of

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 443
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class NAMESPACE.action_states_pkey specified in writer's schema whilst finding reader's schema for a SpecificRecord.

It seems there's something wrong with my approach? If anyone has any experience reading AVRO data from Kafka Streams using Apache Beam, please do help me out. I greatly appreciate it.

Here's a snapshot of my package with the schema and class in it as well: package/working path details

Thanks.

1
You seem to be using the Confluent Schema Registry? So why not use their existing Avro deserializers? And if you do, then that isn't compatible with the other deserializer implementation you have... In other words, how was the data actually produced? Using schema registry, or raw Avro data? - OneCricketeer
i was in the assumption that im using confluent's deserializers already ? import io.confluent.kafka.serializers.KafkaAvroDeserializer; this is the Library im using - Nicholas Leong Zhi Hao
Sorry I don't quite understand what you mean by ' how was the data actually produced? Using schema registry, or raw Avro data? ' I got the schema from the schema registry, compiled it into a java.class, then proceeded - Nicholas Leong Zhi Hao
Sorry, I thought you'd written a different deserializer that wasn't using the registry at first. I think your second approach is correct. I'm not sure why the namespace is being capitalized - OneCricketeer
Sure. Was trying to mask it for privacy reasons. Here's the error : Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 443 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class com.dattran.bottledwater.dbschema.public.action_states_pkey specified in writer's schema whilst finding reader's schema for a SpecificRecord. Im guessing com.dattran.bottledwater.dbschema.public is where the schema is located in the schema registry ? - Nicholas Leong Zhi Hao

1 Answers

0
votes

public class MyClassKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer

Your class is extending the AbstractKafkaAvroDeserializer which returns GenericRecord.

You need to convert the GenericRecord to your custom object.

OR

Use SpecificRecord for this as stated in one of the following answers:

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {

  private Schema readerSchema;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected ReflectKafkaAvroDeserializer(Class<V> type) {
    readerSchema = ReflectData.get().getSchema(type);
  }

  @Override
  protected Object deserialize(
      boolean includeSchemaAndVersion,
      String topic,
      Boolean isKey,
      byte[] payload,
      Schema readerSchemaIgnored) throws SerializationException {

    if (payload == null) {
      return null;
    }

    int schemaId = -1;
    try {
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
      }

      schemaId = buffer.getInt();
      Schema writerSchema = schemaRegistry.getByID(schemaId);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
    } catch (IOException e) {
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
    } catch (RestClientException e) {
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    }
  }
}

The above is copied from https://stackoverflow.com/a/39617120/2534090

https://stackoverflow.com/a/42514352/2534090