0
votes

INPUT:

I have the two datasets:

  1. samples_1 dataset, which has the following columns:timestamp, id, x, y, and 500M records.
  2. samples_2 dataset, which has the same columns as sample_1, and 50M records.

NOTES:

  • In a single dataset, the timestamp and id form the unique key of each record, i.e., timestamp and id alone can be duplicated.
  • Across datasets, id from one dataset cannot be replicated on the other one. Still, timestamp can be duplicated across the two datasets.
  • My cluster contains a driver node and five slave nodes each has 16 cores and 64 GB of RAM.
  • I'm assigning 15 executors for my job, each has 5 cores and 19GB of RAM.

QUESTION:

What I'm trying to do is: for each (timestamp_1, id_1) tuple in sample_1, I need to find all (timestamp_2, id_2, x_2, y_2)s from sample_2 where timestamp_1 equals timestamp_2.

WHAT I HAVE TRIED:

samples_2
  .withColumn("combined", struct("id", "x", "y"))
  .groupBy("timestamp")
  .agg(collect_list("combined").as("combined_list"))
  .join(samples_2, Seq("timestamp"), "rightouter")
  .map {
    case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], id_1: String, x_1: Float, y_1: Float) =>
      val overlappingSamples = samples.map {case Row(id_2: String, x_2: Float, y_2: Float) => (id_2, x_2, y_2)}

      if(overlappingSamples.nonEmpty) {
        val stringifiedSamples = overlappingSamples.map(x => s"${x._1}:${x._2}:${x._3}")
        (timestamp, id_1, stringifiedSamples.mkString("&"))
      } else {
        (timestamp, id_1,"", "")
      }

    case Row(timestamp: String, _, id_1: String, x_1: Float, y_1: Float) => // no overlapping samples
      (timestamp, id_1, "", "")

  }
  .write
  .csv(outputPath)

I have tried this code (using smaller datasets), and it gave the results I'm looking for. The issue here is that it becomes seriously slow when I ran it against the larger datasets. I read that I need to configure the number of partitions through --conf spark.sql.shuffle.partitions=5000, but that didn't solve the problem.

1

1 Answers

0
votes

The problem I see in the above query is that there are too many shuffle operations tied to each other. I didn't check the actual logic of the join but there is a common issue in spark that needs to be handled.

In my opinion, when the execution DAG becomes long in SPARK it becomes little fragile. The reason is the any failure in the first stage requires recomputation the whole DAG.

The strategy I take is to breake the DAG or lineage in multiple smaller DAGs by persisting the result of each join.

val result = datasetA.join(datasetB).persist()
result.count // forces the materialization
// use the result variable in other join

Here the count is mandatory as like other operations spark persist is lazy and require an explicit action (count) to force the join and materialization of the result.

You could try the same for your job and check the performance.