1
votes

I was trying Flink on YARN cluster to run the example code (flinkexamplesWordCount.jar) but am getting the below security authentication error.

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: hdfs://10.94.146.126:8020/user/qawsbtch/flink_out, delimiter:  ))': SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]

I am not sure where the issue is and what is that I am missing to do. I could run spark or map-reduce jobs without any issue in the same cloudera hadoop cluster.

I did update the CONF file paths for hdfs-site.xml and core-site.xml in the flink-conf.yaml (updated same in Master and Worker nodes) and also export the HADOOP_CONF_DIR path. Also I tried give the host:port in the HDFS file path when executing flink run command.

ERROR MESSAGE

    22:14:25,138 ERROR   org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: hdfs://10.94.146.126:8020/user/qawsbtch/flink_out, delimiter:  ))': SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
        at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
        at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
        at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: hdfs://10.94.146.126:8020/user/qawsbtch/flink_out, delimiter:  ))': SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
2
Can you post the full stack trace of the exception? I suspect Flink can not access HDFS, but I would like to see the exact call where the problem is happening.Robert Metzger
@rmetzger Thanks for yourAravindThaipulley
@rmetzger The log contents is much bigger than it can fit in the response comment section here which allows only about 600 characters.. so how do i send the entire log?AravindThaipulley
Can you post it using a GitHub gist gist.github.com ?Robert Metzger

2 Answers

1
votes

(I had a private conversation with the author of the original question to figure out this solution)

The log files posted in the comments of the original question indicate that the job was submitted against a standalone installation of Flink. Standalone Flink currently only supports accessing Kerberos secured HDFS if the user is authenticated on all worker nodes. With Flink on YARN, only the user starting the job on YARN needs to be authenticated with Kerberos.

Also, in the comment section, there was another issue:

robert@cdh544-worker-0:~/hd22/flink-0.9.0$ ./bin/yarn-session.sh -n 2
20:39:50,563 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
20:39:50,600 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
20:39:50,602 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager count = 2
20:39:50,602 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  JobManager memory = 1024
20:39:50,602 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager memory = 1024
20:39:51,708 INFO  org.apache.hadoop.ipc.Client                                  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
20:39:52,710 INFO  org.apache.hadoop.ipc.Client                                  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
20:39:53,712 INFO  org.apache.hadoop.ipc.Client                                  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
20:39:54,714 INFO  org.apache.hadoop.ipc.Client                                  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)

The problem is that you are using Flink 0.9.0 (with Hadoop 2.2.0 included) on a cluster with Hadoop/YARN 2.6.0 with YARN HA enabled. Flink's old (2.2.0) Hadoop library is not able to properly read the ResourceManager address for a HA setup.

Downloading Flink (with Hadoop 2.6.0) will make it work.

0
votes

The issue you are facing might not be related to exporting HADOOP_CONF_DIR for making Hadoop configuration files visible to Flink but in the value of HADOOP_CONF_DIR itself! If you used Cloudera Manager, please make sure that the location you are referring to is correct and does exist in all your nodes. Also, it is worth trying the following common location of Hadoop configuration files: /etc/hadoop/conf

export HADOOP_CONF_DIR=/etc/hadoop/conf