2
votes

I am trying to get twitter popular tags using apache spark and scala. I am able to print the hashtags but when I start to count the hashtags using reduce function I am getting the following error

network.ConnectionManager: Selector thread was interrupted!

I am adding the code over here. Please help me to resolve this issue.

import java.io._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._

object TwitterPopularTags {

  def main(args: Array[String]) {


    val (master, filters) = (args(0), args.slice(5, args.length))

    // Twitter Authentication credentials
    System.setProperty("twitter4j.oauth.consumerKey", "****")
    System.setProperty("twitter4j.oauth.consumerSecret","****")
    System.setProperty("twitter4j.oauth.accessToken", "****")
    System.setProperty("twitter4j.oauth.accessTokenSecret", "****")


    val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(10),
      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

    val tweets = TwitterUtils.createStream(ssc, None)

    val statuses = tweets.map(status => status.getText())

    val words = statuses.flatMap(status => status.split(" "))
        val hashTags = words.filter(word => word.startsWith("#"))


     val counts = hashTags.map(tag => (tag, 1))
                         .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(10))

    counts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

[error] (run-main) java.lang.AssertionError: assertion failed: The checkpoint directory has not been set. Please use StreamingContext.checkpoint() or SparkContext.checkpoint() to set the checkpoint directory. java.lang.AssertionError: assertion failed: The checkpoint directory has not been set. Please use StreamingContext.checkpoint() or SparkContext.checkpoint() to set the checkpoint directory. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.streaming.dstream.DStream.validate(DStream.scala:181) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:227) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:227) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.validate(DStream.scala:227) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$3.apply(DStreamGraph.scala:47) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$3.apply(DStreamGraph.scala:47) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.DStreamGraph.start(DStreamGraph.scala:47) at org.apache.spark.streaming.scheduler.JobGenerator.startFirstTime(JobGenerator.scala:114) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:75) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:410) at TwitterPopularTags$.main(TwitterPopularTags.scala:77) at TwitterPopularTags.main(TwitterPopularTags.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) [trace] Stack trace suppressed: run last compile:run for the full output. 14/11/07 20:07:43 INFO dstream.NetworkReceiver$BlockGenerator: Block pushing thread interrupted 14/11/07 20:07:43 INFO network.ConnectionManager: Selector thread was interrupted! java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 41 s, completed Nov 7, 2014 8:07:43 PM

This is the error I am getting while trying to run the above code.

1
Well at first glance one thing that I notice is that your sliding interval is less than the batch size of the underling stream which is not supported. Could you post the full exception you get?Holden
Hi, I have attached the error log and also changed the sliding time interval which is now equal to the batch size. I am new to spark and my aim to get count of all the tweets which are occuring in twitter for every 10 seconds period of time. Please help me to proceed in that wayVamshi Kolanu
so to use the optimized reduce by key and window we need to set a checkpoint directory. This is so that Spark can keep track of some extra state information. You could either call sc.checkpoint and setup the checkpoint directory, or use the naive reducebykeyandwindow (leave out the _ - _ part).Holden

1 Answers

0
votes

You're using reduceByKeyAndWindow, which will force you to activate checkpointing in Spark. You can check how to perform this one-line operation here