0
votes

currently trying to implement my own implementation of an ApacheSpark V2.0 DStream

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time

class MQTTDStream[T](ssc: StreamingContext) extends DStream(ssc) {
  override def compute(validTime: Time): RDD[Int] = {     ssc.sparkContext.parallelize(Array(1, 2, 3), 1) }
}

The compiler in my eclipse environment is ok with that. But I'm pasting the code to a jupyter notebook in IBM DSExperience and get the following error:

Name: Compile Error Message: :21: error: overriding method compute in class DStream of type (validTime: org.apache.spark.streaming.Time)Option[org.apache.spark.rdd.RDD[Nothing]]; method compute has incompatible type override def compute(validTime: Time): RDD[Int] = { ssc.sparkContext.parallelize(Array(1, 2, 3), 1) } ^ :20: error: class MQTTDStream needs to be abstract, since: it has 2 unimplemented members. /** As seen from class MQTTDStream, the missing signatures are as follows. * For convenience, these are usable as stub implementations. */ def dependencies: List[org.apache.spark.streaming.dstream.DStream[_]] = ??? def slideDuration: org.apache.spark.streaming.Duration = ???

   class MQTTDStream[T](ssc: StreamingContext) extends DStream(ssc) {
         ^ StackTrace:

EDIT: 31.8.16

Now I've progressed a bit:

abstract class MQTTDStream[T](ssc: StreamingContext) extends DStream(ssc) {
  override def compute(validTime: Time): Option[RDD[T]] =
    Some(ssc.sparkContext.parallelize(Seq(1, 2, 3), 1))

  override def dependencies = Nil

  override def slideDuration = Seconds(1) // just an example
}

Gives me:

type mismatch; found : Int(1) required: T

1
Option[org.apache.spark.rdd.RDD[Nothing]]; Smells wrong. Are you sure all the dependencies are properly loaded in jupyter? With the same Spark version?Yuval Itzchakov

1 Answers

1
votes
  1. You are missing the type parameter for DStream (this is where Nothing in the error message comes from); 2. compute should return an Option[RDD[Something]], not just RDD[Something]; 3. You also need to define dependencies and slideDuration.

So the least changes would be

class MQTTDStream[T](ssc: StreamingContext) extends DStream[Int](ssc) {
  override def compute(validTime: Time): Option[RDD[Int]] = 
    Some(ssc.sparkContext.parallelize(Array(1, 2, 3), 1))

  override def dependencies = Nil

  override def slideDuration = Seconds(1) // just an example
}