I want to join some events coming from a json based kafka source with a url field that has related data in a Mongodb collection. Then aggregate them including additional Mongodb data and output the data to a GCS sink.
When I run my structured streaming spark application my spark cluster starts filling disk space unlimitedly. I have configured watermarking to 0 seconds (as I'm aggregating only events from the current processing batch), so maximum state should be 1 or 2 batches. But I'm having this available disk space graph (get stable when killing the app):
Almost all the data filling my HDDs is located under:
/hadoop/yarn/nm-local-dir/usercache/myuser/appcache/myapplication-id
If I disable the mongodb join, the available disk is stable over time but i need the joined data.
My mongodb collection which I want to join is about 11 GB large and my input kafka topic has about 3k records/sec.
My code looks like this:
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object Main {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("My awesome joined app")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
// Mongo config
.set("spark.mongodb.input.uri", "mongodb://mongo/urls.urls")
val session = SparkSession
.builder
.config(conf)
.getOrCreate()
val topic = "events"
val kafkaStream = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", topic)
.option("startingOffsets" , "latest")
.option("maxOffsetsPerTrigger", 10000000)
.option("failOnDataLoss", false)
.option("kafka.max.partition.fetch.bytes", 10485760)
.option("kafka.receive.buffer.bytes", 16000000)
.load()
val urlsDf = MongoSpark.load(session).toDF
import session.implicits._
stream
.selectExpr("CAST (value AS STRING)", " CAST (timestamp AS STRING)").as[(String, String)]
.withColumn("name", json_tuple('value, "name"))
.withColumn("url", json_tuple('value, "url"))
.withColumn("a_name", when(col("name") === "a", 1).otherwise(0))
.withColumn("b_name", when(col("name") === "b", 1).otherwise(0))
.withColumn("date", json_tuple('value, "date"))
// We don't care about reinjecting old data, fake watermarking
.withColumn("server_time", current_timestamp)
.withWatermark("server_time", "0 seconds")
.groupBy($"url", $"server_time")
.agg(
sum(s"a_name") as "a_name",
sum(s"b_name") as "b_name"
)
// Join with entities
.join(urlsDf, $"url" === $"_id", "left_outer")
.select(
"url",
"some_value_from_mongo", // from the joined stream
s"a_name",
s"b_name"
)
.coalesce(24)
.writeStream
.format("parquet")
.outputMode("append")
.option("path", "gs://my-custom-data")
.option("checkpointLocation", "/my-custom-data/checkpoints")
.trigger(Trigger.ProcessingTime(10 * 60 * 1000)) // 10 minutes
.start
.awaitTermination
}
}