Am trying to get familiar with Apache beam Kafka IO and getting following error
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at com.andrewjones.KafkaConsumerExample.main(KafkaConsumerExample.java:58)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
Following is piece of code which reads messages from a kafka topic. Appreciate if you someone can provide some pointers.
public class KafkaConsumerJsonExample { static final String TOKENIZER_PATTERN = "[^\p{L}]+";
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("myTopic2")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 5 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(5)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("wordcounts"));
System.out.println("running data pipeline");
p.run().waitUntilFinish();
}
}