I have a containerized flink cluster with a standalone JobManager and 2 Task Managers. Everything runs fine when I submit a fat jar with just the connectors I need and my code, but a jar with just my code will fail-- even though I have added the connector jars to /opt/flink/lib and the container processes show them to be added to the classpath. The same failure occurs when I try to do a remote connection from my IDE; needless to say, having to bundle a fat jar every time makes for a miserable development experience.
What do I need to do to make flink understand that the additional jars I place in /opt/flink/lib should be used with all new jobs?
Class loader exception:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
The flink Dockerfile:
FROM flink:1.7.2
ADD http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.7.2/flink-connector-kafka_2.12-1.7.2.jar /opt/flink/lib/
ADD http://central.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.7.2/flink-avro-confluent-registry-1.7.2.jar /opt/flink/lib/
ADD http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6_2.12/1.7.2/flink-connector-elasticsearch6_2.12-1.7.2.jar /opt/flink/lib/
Job Manager Container:
root@2406b722dae1:/tmp# ps ax | more
PID TTY STAT TIME COMMAND
1 ? Ssl 1:01 /docker-java-home/jre/bin/java -Xms1024m -Xmx1024m -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFil
e=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/flink-avro-confluent-registry-1.7.2.jar:/opt/flink/lib/flink-connector-elasticsearch6_2.12-1.7.2.jar:/o
pt/flink/lib/flink-connector-kafka_2.12-1.7.2.jar:/opt/flink/lib/flink-python_2.12-1.7.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/fl
ink/lib/flink-dist_2.12-1.7.2.jar::: org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir /opt/flink/conf --executionMode cluster
A Task Manager Container:
root@bd1aa6e35b5a:/tmp# ps ax | more
PID TTY STAT TIME COMMAND
1 ? Ssl 0:28 /docker-java-home/jre/bin/java -XX:+UseG1GC -Xms922M -Xmx922M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-c
onsole.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/flink-avro-confluent-registry-1.7.2.jar:/opt/flink/lib/flin
k-connector-elasticsearch6_2.12-1.7.2.jar:/opt/flink/lib/flink-connector-kafka_2.12-1.7.2.jar:/opt/flink/lib/flink-python_2.12-1.7.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/op
t/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.2.jar::: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf
/opt/flink/lib on both the running JobManager and TaskManagers:
root@bd1aa6e35b5a:/opt/flink/lib# ls -lh
total 84M
-rw------- 1 root root 2.7M Feb 11 16:25 flink-avro-confluent-registry-1.7.2.jar
-rw------- 1 root root 30K Feb 11 16:21 flink-connector-elasticsearch6_2.12-1.7.2.jar
-rw------- 1 root root 67K Feb 11 16:24 flink-connector-kafka_2.12-1.7.2.jar
-rw-r--r-- 1 flink flink 81M Feb 11 14:50 flink-dist_2.12-1.7.2.jar
-rw-r--r-- 1 flink flink 139K Feb 11 14:49 flink-python_2.12-1.7.2.jar
-rw-rw-r-- 1 flink flink 479K Feb 11 14:32 log4j-1.2.17.jar
-rw-rw-r-- 1 flink flink 9.7K Feb 11 14:32 slf4j-log4j12-1.7.15.jar
Just in case there was any doubt-- the class is in the expected jar:
root@bd1aa6e35b5a:/opt/flink/lib# unzip -l flink-connector-kafka_2.12-1.7.2.jar | grep FlinkKafkaConsumer
14272 2019-02-12 00:24 org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.class