I am running a simple WordCount program. Spark Streaming is watching a directory in HDFS for new files and should process them as they come in.
I start my streaming job, I add a bunch of small files to a tmp HDFS directory, then I move these files to the watched HDFS directory (all with simple shell command, -mv). But my streaming job is not recognizing these as new files and therefore not processing them (I checked that the files are well moved).
Currently I am using textFileStream but am open to using fileStream. I'm using 1.3.1 or 1.4.0 Spark version. I'd like to mention that with the 1.0.x version of spark, all's well (it detects new -moved- files) !
The code is:
//files are moved from /user/share/jobs-data/gstream/tmp to /user/share/jobs-data/gstream/streams, both directories are on HDFS.
val sparkConf = new SparkConf().setAppName(this.getClass().getName())
sparkConf.setMaster(master)
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Milliseconds(1000))
val data = ssc.textFileStream(args(1)) //args(1) == /user/share/jobs-data/gstream/streams
val words = data.flatMap(.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey( + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Could anyone have some ideas, thanks ?
ssc.startin there somewhere? And can you show the code that sets upssc? - The Archetypal Paul