I have a Flink session cluster (Job Manager + Task Manager), version 1.11.1, with configured log4j-console.properties to include Kafka appender. In addition, in both Job Manager and Task Manager I'm enabling flink-s3-fs-hadoop built-in plugin.
I've added kafka-clients jar to the flink/lib directory, which is necessary for the container to be running. But I'm still getting the below class loading error when the S3 plugin is being instantiated (and initializing the logger).
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
(full stack trace at the bottom)
As I understood, there is a dedicated dynamic class loading for plugins, which is separated than the system class loading. Therefore, I added the following configurations to the flink-conf.yaml file:
classloader.parent-first-patterns.additional: org.apache.kafka
classloader.resolve-order: parent-first
but the error still appears.
While dubugging, I don't see the additional pattern being added to the "allowedFlinkPackages" of the Plugin Class Loader.
What am I missing here?
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3FileSystemFactory could not be instantiated
at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.ExceptionInInitializerError
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
... 11 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:481) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[kafka-clients-2.5.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[kafka-clients-2.5.0.jar:?]
at org.apache.logging.log4j.core.appender.mom.kafka.DefaultKafkaProducerFactory.newKafkaProducer(DefaultKafkaProducerFactory.java:40) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.startup(KafkaManager.java:136) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.start(KafkaAppender.java:164) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:304) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) ~[log4j-core-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.<clinit>(AbstractS3FileSystemFactory.java:88) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
... 11 more
[2020-12-06T09:15:45,892][Error] {} [o.a.f.c.f.FileSystem]: Failed to load a file system via services
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3AFileSystemFactory could not be instantiated
at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3hadoop.S3FileSystemFactory
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
... 11 more