I've implemented a Beam pipeline reading from Kafka, based on the docs here: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L125
The pipeline itself works fine for bounded sources and I have test cases where it reads from files without issue.
The code reading from Kafka is very simple, and basically identical to the example:
PCollection<String> input = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(KAFKA_BROKER)
.withTopics(Arrays.asList(KAFKA_READ_TOPIC))
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withTimestampFn(new TimestampKafkaStrings())
.withoutMetadata())
.apply(Values.<String>create());
The application starts fine, and seems to connect to Kafka. However, as soon as I write to Kafka from and another process and the pipeline starts reading, I get the following exception on the first read:
INFO: Kafka version : 0.10.2.0
Apr 04, 2017 9:46:18 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 576d93a8dc0cf421
Apr 04, 2017 9:46:30 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader advance
INFO: Reader-0: first record offset 2000
Apr 04, 2017 9:46:30 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader consumerPollLoop
INFO: Reader-0: Returning from consumer pool loop
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:453)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:350)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:71)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
at com.groupbyinc.beam.SessionRollup.main(SessionRollup.java:186)
... 6 more
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
at org.apache.beam.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:64)
at org.apache.beam.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:33)
at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.decode(KafkaIO.java:1018)
at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.advance(KafkaIO.java:989)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:190)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:128)
at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
It seems like there is something wrong with the way the key decoder is attempting to read the Kafka message key. In the source data these keys are not being set explicitly, so I assume they are defaulting to timestamps within Kafka(?).
Any ideas on how to debug this issue further? Or resources I can look at? Functioning examples?
EDIT: Removing the .withTimestampFn() portion of the pipeline has no effect. And the code seems to fail before it ever gets to that point.