4
votes

Below Spark code does not appear to perform any operation on file example.txt

val conf = new org.apache.spark.SparkConf()
  .setMaster("local")
  .setAppName("filter")
  .setSparkHome("C:\\spark\\spark-1.2.1-bin-hadoop2.4")
  .set("spark.executor.memory", "2g");

val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("C:\\example.txt")

dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

I'm attempting to print first 10 elements of file using dataFile.print()

Some of generated output :

15/03/12 12:23:53 INFO JobScheduler: Started JobScheduler
15/03/12 12:23:54 INFO FileInputDStream: Finding new files took 105 ms
15/03/12 12:23:54 INFO FileInputDStream: New files at time 1426163034000 ms:

15/03/12 12:23:54 INFO JobScheduler: Added jobs for time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Starting job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
-------------------------------------------
Time: 1426163034000 ms
-------------------------------------------

15/03/12 12:23:54 INFO JobScheduler: Finished job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Total delay: 0.157 s for time 1426163034000 ms (execution: 0.006 s)
15/03/12 12:23:54 INFO FileInputDStream: Cleared 0 old files that were older than 1426162974000 ms: 
15/03/12 12:23:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:55 INFO FileInputDStream: Finding new files took 2 ms
15/03/12 12:23:55 INFO FileInputDStream: New files at time 1426163035000 ms:

15/03/12 12:23:55 INFO JobScheduler: Added jobs for time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Starting job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
-------------------------------------------
Time: 1426163035000 ms
-------------------------------------------

15/03/12 12:23:55 INFO JobScheduler: Finished job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Total delay: 0.011 s for time 1426163035000 ms (execution: 0.001 s)
15/03/12 12:23:55 INFO MappedRDD: Removing RDD 1 from persistence list
15/03/12 12:23:55 INFO BlockManager: Removing RDD 1
15/03/12 12:23:55 INFO FileInputDStream: Cleared 0 old files that were older than 1426162975000 ms: 
15/03/12 12:23:55 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:56 INFO FileInputDStream: Finding new files took 3 ms
15/03/12 12:23:56 INFO FileInputDStream: New files at time 1426163036000 ms:

example.txt is of format :

gdaeicjdcg,194,155,98,107
jhbcfbdigg,73,20,122,172
ahdjfgccgd,28,47,40,178
afeidjjcef,105,164,37,53
afeiccfdeg,29,197,128,85
aegddbbcii,58,126,89,28
fjfdbfaeid,80,89,180,82

As the print documentation states :

/** * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */

Does this mean 0 RDD have been generated for this stream ? Using Apache Spark if want to see contents of RDD would use collect function of RDD. Is these similar method for Streams ? So in short how to print to console contents of Stream ?

Update :

Updated code based on @0x0FFF comment. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html does not appear to give an example reading from local file system. Is this not as common as using Spark core, where there are explicit examples for reading data from file?

Here is updated code :

val conf = new org.apache.spark.SparkConf()
  .setMaster("local[2]")
  .setAppName("filter")
  .setSparkHome("C:\\spark\\spark-1.2.1-bin-hadoop2.4")
  .set("spark.executor.memory", "2g");

val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("file:///c:/data/")

dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

But output is same. When I add new files to c:\\data dir (which have same format as existing data files) they are not processed. I assume dataFile.print should print first 10 lines to console ?

Update 2 :

Perhaps this is related to fact that I'm running this code in Windows environment?

3

3 Answers

4
votes

You misunderstood the use of textFileStream. Here is its description from Spark documentation:

Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat).

So first, you should pass it the directory, and second, this directory should be available from the node running the receiver, so it is better to use HDFS for this purpose. Then when you put a new file into this directory, it would be processed by the function print() and first 10 lines would be printed for it

Update:

My code:

[alex@sparkdemo tmp]$ pyspark --master local[2]
Python 2.6.6 (r266:84292, Nov 22 2013, 12:16:22) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
s15/03/12 06:37:49 WARN Utils: Your hostname, sparkdemo resolves to a loopback address: 127.0.0.1; using 192.168.208.133 instead (on interface eth0)
15/03/12 06:37:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Python version 2.6.6 (r266:84292, Nov 22 2013 12:16:22)
SparkContext available as sc.
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 30)
>>> dataFile = ssc.textFileStream('file:///tmp')
>>> dataFile.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()
-------------------------------------------
Time: 2015-03-12 06:40:30
-------------------------------------------

-------------------------------------------
Time: 2015-03-12 06:41:00
-------------------------------------------

-------------------------------------------
Time: 2015-03-12 06:41:30
-------------------------------------------
1 2 3
4 5 6
7 8 9

-------------------------------------------
Time: 2015-03-12 06:42:00
-------------------------------------------
0
votes

Here is a custom receiver I wrote that listens for data at a specified dir :

package receivers

import java.io.File
import org.apache.spark.{ SparkConf, Logging }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.receiver.Receiver

class CustomReceiver(dir: String)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("File Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  def recursiveListFiles(f: File): Array[File] = {
    val these = f.listFiles
    these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
  }

  private def receive() {

    for (f <- recursiveListFiles(new File(dir))) {

      val source = scala.io.Source.fromFile(f)
      val lines = source.getLines
      store(lines)
      source.close()
      logInfo("Stopped receiving")
      restart("Trying to connect again")

    }
  }
}

One thing I think to be aware of is that the the files need to be processed in a time that is <= configured batchDuration. In example below it's set to 10 seconds but if time to process files by receiver exceeds 10 seconds then some data files will not be processed. I'm open to correction on this point.

Here is how the custom receiver is implemented :

val conf = new org.apache.spark.SparkConf()
  .setMaster("local[2]")
  .setAppName("filter")
  .setSparkHome("C:\\spark\\spark-1.2.1-bin-hadoop2.4")
  .set("spark.executor.memory", "2g");

val ssc = new StreamingContext(conf, Seconds(10))

val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("C:\\data\\"))

customReceiverStream.print

customReceiverStream.foreachRDD(m => {
  println("size is " + m.collect.size)
})

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

More info at : http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html & https://spark.apache.org/docs/1.2.0/streaming-custom-receivers.html

0
votes

I probably found your issue, you should have this in your log :

WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.

The problem is that you need to have at least 2 cores to run a spark streaming app. So the solution should be to simply replace:

val conf = new org.apache.spark.SparkConf()
 .setMaster("local")

By :

val conf = new org.apache.spark.SparkConf()
  .setMaster("local[*]")

Or at least more than one.