0
votes

How to combine datasets from each spark input stream into one before applying transformations. I am using spark-2.0.0

    val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines = ssc.textFileStream("input")

lines.foreachRDD { rdd =>
  val count = rdd.count()
  if (count > 0) {
    val dataSet = sqlContext.read.json(rdd)
    val accountIds = dataSet.select("accountId").distinct.collect.flatMap(_.toSeq)
    val accountIdArry = accountId.map(accountId => dataSet.where($"accountId" <=> accountId))
    accountIdArry.foreach { arrEle =>
      print(arrEle.count)
      arrEle.show
      arrEle.write.format("json").save("output")
    }
  }
}

I want to write records having count greater than 100000 per accountId into ouput file by considering all input streams. For that I want to merge all DStream into one before performing transformations.

Now it writes all records into output file. Any help?

Updated

org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3.apply(PairDStreamFunctions.scala:433) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3.apply(PairDStreamFunctions.scala:432) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.SparkContext.withScope(SparkContext.scala:682) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) at org.apache.spark.streaming.dstream.PairDStreamFunctions.updateStateByKey(PairDStreamFunctions.scala:432) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$1.apply(PairDStreamFunctions.scala:400) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$1.apply(PairDStreamFunctions.scala:400) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.SparkContext.withScope(SparkContext.scala:682) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) at org.apache.spark.streaming.dstream.PairDStreamFunctions.updateStateByKey(PairDStreamFunctions.scala:399) at SparkExample$.main(:60) ... 56 elided Caused by: java.io.NotSerializableException: SparkExample$ Serialization stack: - object not serializable (class: SparkExample$, value: SparkExample$@ab3b54) - field (class: SparkExample$$anonfun$5, name: $outer, type: class SparkExample$) - object (class SparkExample$$anonfun$5, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 74 more

SparkExample.scala

    import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import play.api.libs.json._
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream._


object SparkExample {
    def main(inputDir: String) {
        val ssc = new StreamingContext(sc, Seconds(2))
        val sqlContext = new SQLContext(sc)


        val lines: DStream[String] = ssc.textFileStream(inputDir)

        val jsonLines = lines.map[JsValue](l => Json.parse(l))

        val accountIdLines = jsonLines.map[(String, JsValue)](json => {
            val accountId = (json \ "accountId").as[String]
            (accountId, json)
        })

        val accountIdCounts = accountIdLines
            .map[(String, Long)]({ case (accountId, json) => {
            (accountId, 1)
        } })
        .reduceByKey((a, b) => a + b)


        // this DStream[(String, Long)] will have current accumulated count for accountId's
        val updatedAccountCounts = accountIdCounts
        .updateStateByKey(updatedCountOfAccounts _)
    }

    def updatedCountOfAccounts(a: Seq[Long], b: Option[Long]): Option[Long] = {
        b.map(i => i + a.sum).orElse(Some(a.sum))
    }
}
1
Have you looked at union? It can be used to union together different streams (or multiple readers on the same stream to increase parallelism).Glennie Helles Sindholt

1 Answers

1
votes

There are two things that you need to keep in mind.

First - Since you are using a StreamingContext with 2 seconds micro-batch your dstreams will contain rdd's having only that data which was generated in these 2 seconds and not all the data. And if you need to perform operations on all the data available at the time then streams are not the right fit for your problem.

Second - you do not need to use sql context to deal with json. Just use any json library and group the rdd on accountId.

import play.api.libs.json._

val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val dstreams = ssc.textFileStream("input")


dstreams.foreachRDD { rdd =>
  val jsonRdd = rdd.map(l => Json.parse(l))
  val grouped = jsonRdd.groupBy(json => (json \ "accountId").as[String])
}

If you want to use updateStateByKey then just stay with DStreams,

import play.api.libs.json._

val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)


val lines: DStream[String] = ssc.textFileStream("inputPath")

val jsonLines = lines.map[JsValue](l => Json.parse(l))

val accountIdLines = jsonLines.map[(String, JsValue)](json => {
  val accountId = (json \ "accountId").as[String]
  (accountId, json)
})

val accounIdCounts = accountIdLines
  .map[(String, Long)]({ case (accountId, json) => {
    (accountId, 1)
  } })
  .reduceByKey((a, b) => a + b)


// this DStream[(String, Long)] will have current accumulated count for accountId's
val updatedAccountCounts = accountIdCounts
  .updateStateByKey(updateCountOfAccounts _)

def updatedCountOfAccounts(a: Seq[Long], b: Option[Long]): Option[Long] = {
  b.map(i => i + a.sum).orElse(Some(a.sum))
}