0
votes

Am trying to get familiar with Apache beam Kafka IO and getting following error

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
    at com.andrewjones.KafkaConsumerExample.main(KafkaConsumerExample.java:58)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

Following is piece of code which reads messages from a kafka topic. Appreciate if you someone can provide some pointers.

public class KafkaConsumerJsonExample { static final String TOKENIZER_PATTERN = "[^\p{L}]+";

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(options);

    p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers("localhost:9092")
            .withTopic("myTopic2")
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)

            .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

            // We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
            // the first 5 records.
            // In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
            .withMaxNumRecords(5)

            .withoutMetadata() // PCollection<KV<Long, String>>
    )
            .apply(Values.<String>create())

            .apply(TextIO.write().to("wordcounts"));
    System.out.println("running data pipeline");
    p.run().waitUntilFinish();
}

}

1

1 Answers

1
votes

The issue is caused by using LongDeserializer for keys that seems were serialised by other serialiser than Long and it depends how you produced the records.

So, you either can use a proper deserializer or, if keys don't matter, as a workaround, try to use StringDeserializer for keys as well.