2
votes

I am running a simple WordCount program. Spark Streaming is watching a directory in HDFS for new files and should process them as they come in.

I start my streaming job, I add a bunch of small files to a tmp HDFS directory, then I move these files to the watched HDFS directory (all with simple shell command, -mv). But my streaming job is not recognizing these as new files and therefore not processing them (I checked that the files are well moved).

Currently I am using textFileStream but am open to using fileStream. I'm using 1.3.1 or 1.4.0 Spark version. I'd like to mention that with the 1.0.x version of spark, all's well (it detects new -moved- files) !

The code is:

//files are moved from /user/share/jobs-data/gstream/tmp to /user/share/jobs-data/gstream/streams, both directories are on HDFS.

val sparkConf = new SparkConf().setAppName(this.getClass().getName())
sparkConf.setMaster(master)
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Milliseconds(1000))
val data = ssc.textFileStream(args(1)) //args(1) == /user/share/jobs-data/gstream/streams
val words = data.flatMap(.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey( + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

Could anyone have some ideas, thanks ?

3
Please post your code. Otherwise we're just guessing what you're done - The Archetypal Paul
In fact, the code is a simple word count: val data = ssc.textFileStream(args(1)); val words = data.flatMap(.split(" ")); val wordCounts = words.map(word => (word, 1)).reduceByKey( + _); wordCounts.print(); - Momog
Edit the code into your question, please. Are both your watched directory and tmp directory on the same file system? "Files must be written to the monitored directory by "moving" them from another location within the same file system". Maybe your tmp is somewhere different. - The Archetypal Paul
Ok, I edited my question and added some other details - Momog
I take it there's a ssc.start in there somewhere? And can you show the code that sets up ssc? - The Archetypal Paul

3 Answers

1
votes

I think the reason is that FileInputDStream uses modification time to find new files. But if a file was moved into the directories its modification time would not be changed, so FileInputDStream could not detect these files.

A way to solve this problem is that you can use some way to change modification time, such as use "copy" to instead "move".

0
votes

You need to modify your code :

    val fstream = ssc.fileStream[LongWritable, Text, TextInputFormat](hdfsPath, (f: Path)
                     => true, newFilesOnly=false).map(pair => pair._2.toString)
0
votes

I've just encountered the similar problem.

In my case the problem was that I copied the file before starting the StreamingContext, but it picks only files created after starting it.

Try to copy the file in different folder after starting StreamingContext and then move it to the folder that you scan.