1
votes

How can I stream files already present in HDFS using apache spark?

I have a very specific use case where I have millions of customer data and I want to process them at a customer level using apache stream. Currently what I am trying to do is I am taking entire customer dataset and repartition it on customerId and creating 100 such partitions and ensuring unique customer multiple records to be passed in a single stream.

Now I have all the data present in HDFS location

hdfs:///tmp/dataset

Now using the above HDFS location I want to stream the files which will read the parquet file get the dataset. I have tried the following things but no luck.

// start stream
val sparkConf = new SparkConf().setAppName("StreamApp")

// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(60))
val dstream = ssc.sparkContext.textFile("hdfs:///tmp/dataset")

println("dstream: " + dstream)
println("dstream count: " + dstream.count())
println("dstream context: " + dstream.context)

ssc.start()
ssc.awaitTermination()


NOTE: This solution doesn't stream data it just reads data from HDFS

and

// start stream
val sparkConf = new SparkConf().setAppName("StreamApp")

// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(60))
val dstream = ssc.textFileStream("hdfs:///tmp/dataset")

println("dstream: " + dstream)
println("dstream count: " + dstream.count())
println("dstream context: " + dstream.context)

dstream.print()
ssc.start()
ssc.awaitTermination()

I am always getting 0 result. Is is possible to stream files from HDFS if is already present in HDFS where no new files are publishing.

1
The easiest approach is to move the files into hdfs:///tmp/dataset after streaming context is started.shanmuga

1 Answers

1
votes

TL;DR This functionality is not supported in spark as of now. The closest you can get is by moving the files into hdfs:///tmp/dataset after starting the streaming context.


textFileStream internally uses FileInputDStream which has an option newFilesOnly. But this does not process all existing files but only the files which were modified within one minute (set by config value spark.streaming.fileStream.minRememberDuration) before streaming context. As described in jira issue

when you set the newFilesOnly to false, it means this FileInputDStream would not only handle coming files, but also include files which came in the past 1 minute (not all the old files). The length of time defined in FileInputDStream.MIN_REMEMBER_DURATION.

Or

You could create an (normal) RDD out the existing files before you start the streaming context. Which can be used along with the stream RDD later.