Newbie to Flink.
I am able to run the example wordcount.jar on a file present in remote hdfs cluster without declaring fs.hdfs.hadoopconf variable in flink conf.
So wondering what exactly is the purpose of above mentioned variable.
Does declaring it changes the way one runs the example jar ?
Command :
flink-cluster.vm ~]$ /opt/flink/bin/flink run /opt/flink/examples/batch/WordCount.jar --input hdfs://hadoop-master:9000/tmp/test-events
Output:
.......
07/13/2016 00:50:13 Job execution switched to status FINISHED.
(foo,1)
.....
(bar,1)
(one,1)
Setup :
- Remote HDFS cluster on hdfs://hadoop-master.vm:9000
- Flink cluster on running on flink-cluster.vm
Thanks
Update :
As pointed out by Serhiy, declared fs.hdfs.hadoopconf in conf but on running the job with updated argument hdfs:///tmp/test-events.1468374669125
got the following error
flink-conf.yaml
# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
fs.hdfs.hadoopconf: hdfs://hadoop-master:9000/
fs.hdfs.hdfsdefault : hdfs://hadoop-master:9000/
Command :
flink-cluster.vm ~]$ /opt/flink/bin/flink run /opt/flink/examples/batch/WordCount.jar --input hdfs:///tmp/test-events
Output :
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: The given HDFS file URI (hdfs:///tmp/test-events.1468374669125) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
... 19 more