How to combine datasets from each spark input stream into one before applying transformations. I am using spark-2.0.0
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines = ssc.textFileStream("input")
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val dataSet = sqlContext.read.json(rdd)
val accountIds = dataSet.select("accountId").distinct.collect.flatMap(_.toSeq)
val accountIdArry = accountId.map(accountId => dataSet.where($"accountId" <=> accountId))
accountIdArry.foreach { arrEle =>
print(arrEle.count)
arrEle.show
arrEle.write.format("json").save("output")
}
}
}
I want to write records having count greater than 100000 per accountId into ouput file by considering all input streams. For that I want to merge all DStream into one before performing transformations.
Now it writes all records into output file. Any help?
Updated
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3.apply(PairDStreamFunctions.scala:433) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3.apply(PairDStreamFunctions.scala:432) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.SparkContext.withScope(SparkContext.scala:682) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) at org.apache.spark.streaming.dstream.PairDStreamFunctions.updateStateByKey(PairDStreamFunctions.scala:432) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$1.apply(PairDStreamFunctions.scala:400) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$1.apply(PairDStreamFunctions.scala:400) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.SparkContext.withScope(SparkContext.scala:682) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) at org.apache.spark.streaming.dstream.PairDStreamFunctions.updateStateByKey(PairDStreamFunctions.scala:399) at SparkExample$.main(:60) ... 56 elided Caused by: java.io.NotSerializableException: SparkExample$ Serialization stack: - object not serializable (class: SparkExample$, value: SparkExample$@ab3b54) - field (class: SparkExample$$anonfun$5, name: $outer, type: class SparkExample$) - object (class SparkExample$$anonfun$5, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 74 more
SparkExample.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import play.api.libs.json._
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream._
object SparkExample {
def main(inputDir: String) {
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines: DStream[String] = ssc.textFileStream(inputDir)
val jsonLines = lines.map[JsValue](l => Json.parse(l))
val accountIdLines = jsonLines.map[(String, JsValue)](json => {
val accountId = (json \ "accountId").as[String]
(accountId, json)
})
val accountIdCounts = accountIdLines
.map[(String, Long)]({ case (accountId, json) => {
(accountId, 1)
} })
.reduceByKey((a, b) => a + b)
// this DStream[(String, Long)] will have current accumulated count for accountId's
val updatedAccountCounts = accountIdCounts
.updateStateByKey(updatedCountOfAccounts _)
}
def updatedCountOfAccounts(a: Seq[Long], b: Option[Long]): Option[Long] = {
b.map(i => i + a.sum).orElse(Some(a.sum))
}
}
union
? It can be used to union together different streams (or multiple readers on the same stream to increase parallelism). – Glennie Helles Sindholt