1
votes

I know there have been quite a few questions on this, but I've created a simple example that I thought should work,but still does not and I'm not sure I understand why

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")

    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaDStream = KafkaUtils.createStream(streamingContext,"hubble1:2181","aaa",Map("video"->3))

    val wordDStream = kafkaDStream.flatMap(t=>t._2.split("  "))

    val mapDStream = wordDStream.map((_,1))

    val wordToSumDStream = mapDStream.updateStateByKey{
      case (seq,buffer) => {
        val sum = buffer.getOrElse(0) + seq.sum
        Option(sum)
      }
    }

    wordToSumDStream.print()

    streamingContext.start()

    streamingContext.awaitTermination()
  }
Error:(41, 41) missing parameter type for expanded function
The argument types of an anonymous function must be fully known. (SLS 8.5)
Expected type was: (Seq[Int], Option[?]) => Option[?]
      val result = flat.updateStateByKey{

Can someone explain why the mapDStream.updateStateByKey method statement does not compile?

1

1 Answers

1
votes

Put your logic inside function like below.

  def update(seq:Seq[Int],buffer: Option[Int]) = Some(buffer.getOrElse(0) + seq.sum)
  val wordToSumDStream = mapDStream.updateStateByKey(update)

Check Example