0
votes

We are trying to use StreamingFileSink to write to a S3 bucket. Its a simple job which reads from Kafka and sinks to S3. The credentials for s3 are configured in the flink cluster. We are using flink 1.7.2 without pre bundled hadoop. As suggested in the documentation we have added the flink-s3-fs-hadoop jar to the lib directory of the flink cluster. When we run the job, we are getting this particular Kerberos exception. What are we doing wrong? Is there any configuration/jar that we are missing?

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.KerberosAuthException: failure to login: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
at com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
at com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:133)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:1877)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1789)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:247)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1799)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:247)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)

Its a scala application and these are the dependencies added: Compile scope: Flink jars are 1.7.2 flink-java flink-streaming-java flink-streaming-scala flink-connector-kafka flink-parquet flink-avro parquet-avro -> 1.10.0

Provided scope: flink-shaded-hadoop2

1

1 Answers

0
votes

This happened because -> by default the container didn't have a username mapped to the id. And the java sun security module makes a native unix call to extract the username. Changed the Dockerfile to make a mapping between new random user id and user name.