3
votes

On Spark 2.0.x, I have been using a JobProgressListener implementation to retrieve Job/Stage/Task progress information in real-time from our cluster. I understand how the event flow works, and successfully receive updates on the work.

My problem is that we have several different submissions running at the same time on the same Spark Context, and it is seemingly impossible to differentiate between which Job/Stage/Task belongs to each submittal. Each Job/Stage/Task receives a unique id, which is great. However, I'm looking for a way to provide a submission "id" or "name" that would be returned along with the received JobProgressListener event objects.

I realize that the "Job Group" can be set on the Spark Context, but if multiple jobs are simultaneously running on the same context, they will become scrambled.

Is there a way I can sneak in custom properties that would be returned with the listener events for a single SQLContext? In so doing, I should be able to link up subsequent Stage and Task events and get what I need.

Please note: I am not using spark-submit for these jobs. They are being executed using Java references to a SparkSession/SQLContext.

Thanks for any solutions or ideas.

1

1 Answers

0
votes

I'm using a local property - this can be accessed from listener during the onStageSubmit event. After that I'm using the corresponding stageId in order to identify the task executed during that stage.

Future({
      sc.setLocalProperty("job-context", "second")
      val listener = new MetricListener("second")
      sc.addSparkListener(listener)
      //do some spark actions
      val df = spark.read.load("...")
      val countResult = df.filter(....).count()
      println(listener.rows)
      sc.removeSparkListener(listener)
    })

class MetricListener(name:String) extends SparkListener{

  var rows: Long = 0L
  var stageId = -1
  
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
    if (stageSubmitted.properties.getProperty("job-context") == name){
      stageId = stageSubmitted.stageInfo.stageId
    }
  }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    if (taskEnd.stageId == stageId)
      rows = rows + taskEnd.taskMetrics.inputMetrics.recordsRead
  }
  
}