0
votes

I am running a spark structured streaming job which involves creation of an empty dataframe, updating it using each micro-batch as below. With every micro batch execution, number of stages increases by 4. To avoid recomputation, I am persisting the updated StaticDF into memory after each update inside loop. This helps in skipping those additional stages which gets created with every new micro batch.

My questions -

1) Even though the total completed stages remains same as the increased stages are always skipped but can it cause a performance issue as there can be millions on skipped stages at one point of time?
2) What happens when somehow some part or all of cached RDD is not available? (node/executor failure). Spark documentation says that it doesn't materialise the whole data received from multiple micro batches so far so does it mean that it will need read all events again from Kafka to regenerate staticDF?

// one time creation of empty static(not streaming) dataframe
val staticDF_schema = new StructType()
      .add("product_id", LongType)
      .add("created_at", LongType)
var staticDF = sparkSession
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], staticDF_schema)

// Note : streamingDF was created from Kafka source
    streamingDF.writeStream
      .trigger(Trigger.ProcessingTime(10000L))
      .foreachBatch {
        (micro_batch_DF: DataFrame) => {

        // fetching max created_at for each product_id in current micro-batch
          val staging_df = micro_batch_DF.groupBy("product_id")
            .agg(max("created").alias("created"))

          // Updating staticDF using current micro batch
          staticDF = staticDF.unionByName(staging_df)
          staticDF = staticDF
            .withColumn("rnk",
              row_number().over(Window.partitionBy("product_id").orderBy(desc("created_at")))
            ).filter("rnk = 1")
            .drop("rnk")
              .cache()

          }

enter image description here

1
Isn't that what checkpoints are for? spark.apache.org/docs/latest/… - mazaneicha
@mazaneicha Thanks, I am looking for checkpointing documentation here which says that it stores the offsets and intermediate aggregates to checkpoint location. It is still no clear that it will store the staticDF on HDFS or only use the offsets to read everything again from Kafka. spark.apache.org/docs/latest/… - conetfun
@user10938362 Unfortunately not, I understand recompute won't happen as results are already available but I am concerned if there will be an overhead during DAG creation as number of total stages can reach millions (even though completed stages will always remain 7 and rest will be skipped) - conetfun
Checkpoints save both metadata and state. - mazaneicha

1 Answers

0
votes

Even though the skipped stages doesn't need any computation but my job started failing after a certain number of batches. This was because of DAG growth with every batch execution, making it un-manageable and throwing stack overflow exception.

To avoid this, I had to break the spark lineage so that number of stages don't increase with every run (even if they are skipped)