INPUT:
I have the two datasets:
samples_1dataset, which has the following columns:timestamp, id, x, y, and 500M records.samples_2dataset, which has the same columns assample_1, and 50M records.
NOTES:
- In a single dataset, the
timestampandidform the unique key of each record, i.e.,timestampandidalone can be duplicated. - Across datasets,
idfrom one dataset cannot be replicated on the other one. Still,timestampcan be duplicated across the two datasets. - My cluster contains a driver node and five slave nodes each has 16 cores and 64 GB of RAM.
- I'm assigning 15 executors for my job, each has 5 cores and 19GB of RAM.
QUESTION:
What I'm trying to do is: for each (timestamp_1, id_1) tuple in sample_1, I need to find all (timestamp_2, id_2, x_2, y_2)s from sample_2 where timestamp_1 equals timestamp_2.
WHAT I HAVE TRIED:
samples_2
.withColumn("combined", struct("id", "x", "y"))
.groupBy("timestamp")
.agg(collect_list("combined").as("combined_list"))
.join(samples_2, Seq("timestamp"), "rightouter")
.map {
case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], id_1: String, x_1: Float, y_1: Float) =>
val overlappingSamples = samples.map {case Row(id_2: String, x_2: Float, y_2: Float) => (id_2, x_2, y_2)}
if(overlappingSamples.nonEmpty) {
val stringifiedSamples = overlappingSamples.map(x => s"${x._1}:${x._2}:${x._3}")
(timestamp, id_1, stringifiedSamples.mkString("&"))
} else {
(timestamp, id_1,"", "")
}
case Row(timestamp: String, _, id_1: String, x_1: Float, y_1: Float) => // no overlapping samples
(timestamp, id_1, "", "")
}
.write
.csv(outputPath)
I have tried this code (using smaller datasets), and it gave the results I'm looking for. The issue here is that it becomes seriously slow when I ran it against the larger datasets. I read that I need to configure the number of partitions through --conf spark.sql.shuffle.partitions=5000, but that didn't solve the problem.