0
votes

I want to join some events coming from a json based kafka source with a url field that has related data in a Mongodb collection. Then aggregate them including additional Mongodb data and output the data to a GCS sink.

When I run my structured streaming spark application my spark cluster starts filling disk space unlimitedly. I have configured watermarking to 0 seconds (as I'm aggregating only events from the current processing batch), so maximum state should be 1 or 2 batches. But I'm having this available disk space graph (get stable when killing the app):

Disk usage

Almost all the data filling my HDDs is located under: /hadoop/yarn/nm-local-dir/usercache/myuser/appcache/myapplication-id

If I disable the mongodb join, the available disk is stable over time but i need the joined data.

My mongodb collection which I want to join is about 11 GB large and my input kafka topic has about 3k records/sec.

My code looks like this:

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object Main {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("My awesome joined app")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
      .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
      .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")

      // Mongo config
      .set("spark.mongodb.input.uri", "mongodb://mongo/urls.urls")

    val session = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    val topic = "events"
    val kafkaStream = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("subscribe", topic)
      .option("startingOffsets" , "latest")
      .option("maxOffsetsPerTrigger", 10000000)
      .option("failOnDataLoss", false)
      .option("kafka.max.partition.fetch.bytes", 10485760)
      .option("kafka.receive.buffer.bytes", 16000000)
      .load()

    val urlsDf = MongoSpark.load(session).toDF

    import session.implicits._
    stream
      .selectExpr("CAST (value AS STRING)", " CAST (timestamp AS STRING)").as[(String, String)]
      .withColumn("name", json_tuple('value, "name"))
      .withColumn("url", json_tuple('value, "url"))

      .withColumn("a_name", when(col("name") === "a", 1).otherwise(0))
      .withColumn("b_name", when(col("name") === "b", 1).otherwise(0))

      .withColumn("date", json_tuple('value, "date"))

      // We don't care about reinjecting old data, fake watermarking
      .withColumn("server_time", current_timestamp)
      .withWatermark("server_time", "0 seconds")

      .groupBy($"url", $"server_time")
      .agg(
        sum(s"a_name") as "a_name",
        sum(s"b_name") as "b_name"
      )

      // Join with entities
      .join(urlsDf, $"url" === $"_id", "left_outer")

      .select(
        "url",
        "some_value_from_mongo", // from the joined stream
        s"a_name",
        s"b_name"
      )

      .coalesce(24)
      .writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", "gs://my-custom-data")
      .option("checkpointLocation", "/my-custom-data/checkpoints")
      .trigger(Trigger.ProcessingTime(10 * 60 * 1000)) // 10 minutes

      .start
      .awaitTermination
  }
}

1

1 Answers

0
votes

The issue was solved setting dynamicAllocation to false.

To do that you can set the following conf in your spark-submit:

  --conf "spark.dynamicAllocation.enabled=false" \

There seems to be an issue with Yarn or Spark not deleting unneeded state before the executor exists. https://issues.apache.org/jira/browse/YARN-7070.

Apart from that, joining a 11gb continuously growing mongo collection is a bad architectural design. As the collection increases, growing the spark executors would be needed. In my case this also lead to huge GC times due to a lot of shuffling between workers.