I am trying to get twitter popular tags using apache spark and scala. I am able to print the hashtags but when I start to count the hashtags using reduce function I am getting the following error
network.ConnectionManager: Selector thread was interrupted!
I am adding the code over here. Please help me to resolve this issue.
import java.io._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
object TwitterPopularTags {
def main(args: Array[String]) {
val (master, filters) = (args(0), args.slice(5, args.length))
// Twitter Authentication credentials
System.setProperty("twitter4j.oauth.consumerKey", "****")
System.setProperty("twitter4j.oauth.consumerSecret","****")
System.setProperty("twitter4j.oauth.accessToken", "****")
System.setProperty("twitter4j.oauth.accessTokenSecret", "****")
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(10),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
val tweets = TwitterUtils.createStream(ssc, None)
val statuses = tweets.map(status => status.getText())
val words = statuses.flatMap(status => status.split(" "))
val hashTags = words.filter(word => word.startsWith("#"))
val counts = hashTags.map(tag => (tag, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(10))
counts.print()
ssc.start()
ssc.awaitTermination()
}
}
[error] (run-main) java.lang.AssertionError: assertion failed: The checkpoint directory has not been set. Please use StreamingContext.checkpoint() or SparkContext.checkpoint() to set the checkpoint directory. java.lang.AssertionError: assertion failed: The checkpoint directory has not been set. Please use StreamingContext.checkpoint() or SparkContext.checkpoint() to set the checkpoint directory. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.streaming.dstream.DStream.validate(DStream.scala:181) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:227) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:227) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.validate(DStream.scala:227) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$3.apply(DStreamGraph.scala:47) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$3.apply(DStreamGraph.scala:47) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.DStreamGraph.start(DStreamGraph.scala:47) at org.apache.spark.streaming.scheduler.JobGenerator.startFirstTime(JobGenerator.scala:114) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:75) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:410) at TwitterPopularTags$.main(TwitterPopularTags.scala:77) at TwitterPopularTags.main(TwitterPopularTags.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) [trace] Stack trace suppressed: run last compile:run for the full output. 14/11/07 20:07:43 INFO dstream.NetworkReceiver$BlockGenerator: Block pushing thread interrupted 14/11/07 20:07:43 INFO network.ConnectionManager: Selector thread was interrupted! java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 41 s, completed Nov 7, 2014 8:07:43 PM
This is the error I am getting while trying to run the above code.