I know there have been quite a few questions on this, but I've created a simple example that I thought should work,but still does not and I'm not sure I understand why
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaDStream = KafkaUtils.createStream(streamingContext,"hubble1:2181","aaa",Map("video"->3))
val wordDStream = kafkaDStream.flatMap(t=>t._2.split(" "))
val mapDStream = wordDStream.map((_,1))
val wordToSumDStream = mapDStream.updateStateByKey{
case (seq,buffer) => {
val sum = buffer.getOrElse(0) + seq.sum
Option(sum)
}
}
wordToSumDStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
Error:(41, 41) missing parameter type for expanded function
The argument types of an anonymous function must be fully known. (SLS 8.5)
Expected type was: (Seq[Int], Option[?]) => Option[?]
val result = flat.updateStateByKey{
Can someone explain why the mapDStream.updateStateByKey method statement does not compile?