0
votes

I would like to stream data to AWS S3 from Flink job. Below is the link to the simple test app that is streaming data to S3.

https://github.com/dmiljkovic/test-flink-bucketingsink-s3

The code works from IntelliJ also works when the jar is submitted to Flink "cluster" on my machine. The problem is that job works only once. If the job is submitted second time below stack trace is produced. If I restart the cluster job works but only for the first time.

org.apache.commons.logging.LogConfigurationException: java.lang.IllegalAccessError: tried to access class org.apache.commons.logging.impl.LogFactoryImpl$3 from class org.apache.commons.logging.impl.LogFactoryImpl (Caused by java.lang.IllegalAccessError: tried to access class org.apache.commons.logging.impl.LogFactoryImpl$3 from class org.apache.commons.logging.impl.LogFactoryImpl)
    at org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637)
    at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336)
    at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310)
    at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685)
    at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:76)
    at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:102)
    at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:88)
    at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:96)
    at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
    at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
    at com.amazonaws.http.AmazonHttpClient.<init>(AmazonHttpClient.java:158)
    at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:119)
    at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:389)
    at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:371)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: tried to access class org.apache.commons.logging.impl.LogFactoryImpl$3 from class org.apache.commons.logging.impl.LogFactoryImpl
    at org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700)
    at org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187)
    at org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914)
    at org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604)
    ... 26 more
Source: Collection Source -> S3BucketingSink_write__bbb_UID_1 -> (Sink: S3BucketingSink_write__bbb_UID_2, Sink: S3BucketingSink_write__bbb_UID_3) (1/1)
1
Adding details re what version of Flink and Hadoop would be useful. - kkrugler
Flink version is 1.4.1 - dejan

1 Answers

1
votes

This is due to the fact that "commons-logging" does not work well with the inverted classloading.

Two immediate workarounds are:

  • switch to parent-first classloading
  • remove all Hadoop and commons-logging code from the application jar file

For Flink 1.4.2 and Flink 1.5 we make sure that "commons-logging" always loads parent-first. Specially handling commons-logging like that seems to be quite common in projects (JBoss, Tomcat, etc all do that).