3
votes

Newbie to Flink.
I am able to run the example wordcount.jar on a file present in remote hdfs cluster without declaring fs.hdfs.hadoopconf variable in flink conf.

So wondering what exactly is the purpose of above mentioned variable.
Does declaring it changes the way one runs the example jar ?

Command :

flink-cluster.vm ~]$ /opt/flink/bin/flink run  /opt/flink/examples/batch/WordCount.jar --input hdfs://hadoop-master:9000/tmp/test-events

Output:

.......
07/13/2016 00:50:13 Job execution switched to status FINISHED.
(foo,1)
.....
(bar,1)
(one,1)

Setup :

  • Remote HDFS cluster on hdfs://hadoop-master.vm:9000
  • Flink cluster on running on flink-cluster.vm

Thanks

Update :
As pointed out by Serhiy, declared fs.hdfs.hadoopconf in conf but on running the job with updated argument hdfs:///tmp/test-events.1468374669125 got the following error

flink-conf.yaml

# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
fs.hdfs.hadoopconf: hdfs://hadoop-master:9000/
fs.hdfs.hdfsdefault :  hdfs://hadoop-master:9000/

Command :

flink-cluster.vm ~]$ /opt/flink/bin/flink run  /opt/flink/examples/batch/WordCount.jar --input hdfs:///tmp/test-events

Output :

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: The given HDFS file URI (hdfs:///tmp/test-events.1468374669125) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 19 more
1

1 Answers

2
votes

From documentation:

fs.hdfs.hadoopconf: The absolute path to the Hadoop File System’s (HDFS) configuration directory (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (hdfs:///path/to/files, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like hdfs://address:port/path/to/files. This option also causes file writers to pick up the HDFS’s default values for block sizes and replication factors. Flink will look for the “core-site.xml” and “hdfs-site.xml” files in the specified directory.