I'm trying to deploy a Flink job to a cluster based on the flink:1.4.1-hadoop27-scala_2.11-alpine image. The job is using a Kafka connector source (flink-connector-kafka-0.11) to which I'm trying to assign timestamps and watermarks. My code is very similar to the Scala example in the Flink Kafka connector documentation. But with FlinkKafkaConsumer011
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
This works great when running locally from my IDE. However, in the cluster envrionment I get the following error:
java.lang.ClassNotFoundException: com.my.organization.CustomWatermarkEmitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1863)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2037)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
I'm building my job as a fat jar which I have verified contains this class. Does this example from the documentation only work if the CustomWatermarkEmitter class is in the /opt/flink/lib/ folder?
This is the way I had to solve the issue. But having to build this class separately and placing it in /opt/flink/lib complicates my build process significantly so I was wondering if this is the way it's supposed to be solved or if there are other ways around this problem?
For example this section in the Flink documentation hints at having to provide some sources a UserCodeClassLoader manually? Including the provided Kafka source?
It seems to use a "userCodeClassLoader" internally as far as I could see in org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher:
case PERIODIC_WATERMARKS: {
for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
AssignerWithPeriodicWatermarks<T> assignerInstance =
watermarksPeriodic.deserializeValue(userCodeClassLoader);
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
partitionEntry.getKey(),
kafkaHandle,
assignerInstance);
partitionState.setOffset(partitionEntry.getValue());
partitionStates.add(partitionState);
}
EDIT:
I have created a simple project where this issue can be reprodued here: https://github.com/lragnarsson/flink-kafka-classpath-problem
In order to reproduce, you need docker and docker-compose.
just do:
- git clone https://github.com/lragnarsson/flink-kafka-classpath-problem.git
- cd flink-kafka-classpath-problem/docker
- docker-compose build
- docker-compose up
- Go to localhost:8081 in your browser
- Submit included jar file from target/scala-2.11/flink-kafka-classpath-problem-assembly-0.1-SNAPSHOT.jar
This should lead to the exception java.lang.ClassNotFoundException: se.ragnarsson.lage.MyTimestampExtractor