2
votes

I'm trying to deploy a Flink job to a cluster based on the flink:1.4.1-hadoop27-scala_2.11-alpine image. The job is using a Kafka connector source (flink-connector-kafka-0.11) to which I'm trying to assign timestamps and watermarks. My code is very similar to the Scala example in the Flink Kafka connector documentation. But with FlinkKafkaConsumer011

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())

This works great when running locally from my IDE. However, in the cluster envrionment I get the following error:

java.lang.ClassNotFoundException: com.my.organization.CustomWatermarkEmitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1863)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2037)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

I'm building my job as a fat jar which I have verified contains this class. Does this example from the documentation only work if the CustomWatermarkEmitter class is in the /opt/flink/lib/ folder?

This is the way I had to solve the issue. But having to build this class separately and placing it in /opt/flink/lib complicates my build process significantly so I was wondering if this is the way it's supposed to be solved or if there are other ways around this problem?

For example this section in the Flink documentation hints at having to provide some sources a UserCodeClassLoader manually? Including the provided Kafka source?

It seems to use a "userCodeClassLoader" internally as far as I could see in org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher:

            case PERIODIC_WATERMARKS: {
            for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());

                AssignerWithPeriodicWatermarks<T> assignerInstance =
                        watermarksPeriodic.deserializeValue(userCodeClassLoader);

                KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
                        new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                                partitionEntry.getKey(),
                                kafkaHandle,
                                assignerInstance);

                partitionState.setOffset(partitionEntry.getValue());

                partitionStates.add(partitionState);
            }

EDIT:

I have created a simple project where this issue can be reprodued here: https://github.com/lragnarsson/flink-kafka-classpath-problem

In order to reproduce, you need docker and docker-compose.

just do:

  1. git clone https://github.com/lragnarsson/flink-kafka-classpath-problem.git
  2. cd flink-kafka-classpath-problem/docker
  3. docker-compose build
  4. docker-compose up
  5. Go to localhost:8081 in your browser
  6. Submit included jar file from target/scala-2.11/flink-kafka-classpath-problem-assembly-0.1-SNAPSHOT.jar

This should lead to the exception java.lang.ClassNotFoundException: se.ragnarsson.lage.MyTimestampExtractor

1
Are you able to put a sanitized version of your actual code? I've successfully done this exact thing with no issues (Flink 1.4, Kafka 0.11 connector, source-based timestamps), so the issue may be subtle.Joshua DeWald
I will try to prepare a minimal example. Did you deploy in a docker container? I'm running it with one taskmanager container and one jobmanager under Kubernetes, similar to this: ci.apache.org/projects/flink/flink-docs-release-1.4/ops/…Lage Ragnarsson
@JoshuaDeWald I have edited the original post with an example project which reproduces this problem.Lage Ragnarsson
Nice! Based on everything here, it seems like you stumbled on a bug where perhaps the the Kafka connector is using the global Flink classloader (essentially, parent-only) rather than the user classloader that it itself has been loaded from. I misspoke above about having done the same thing, as I in fact used one of the built-in extractors, which would have been loaded from the main classloader.Joshua DeWald

1 Answers

1
votes

I think you've stumbled on a bug introduced in Flink 1.4.1: https://issues.apache.org/jira/browse/FLINK-8741.

It will be fixed shortly in 1.4.2. You can try to test in on the 1.4.2.rc2:https://github.com/apache/flink/tree/release-1.4.2-rc2