2
votes

I am trying to process a dataset that is approximately 2 tb using a cluster with 4.5 tb of ram. The data is in parquet format and is initially loaded into a dataframe. A subset of the data is then queried for and converted to RDD for more complicated processing. The first stage of that processing is to mapToPair to use each rows id as the key in a tuple. Then the data goes through a combineByKey operation to group all values with the same key. This operation always exceeds the maximum cluster memory and the job eventually fails. While it is shuffling there is a lot of "spilling in-memory map to disk" messages. I am wondering if I were to have the data initially partitioned such that all the rows with the same id resided within the same partition if it would need to do left shuffling and perform correctly.

To do the initial load I am using:

sqlContext.read().parquet(inputPathArray).repartition(10000, new Column("id"));

I am not sure if this is the correct way to partition a dataframe so that is my first question is the above correct.

My next question is that when I go from the dataframe to rdd using:

JavaRDD<LocationRecord> locationsForSpecificKey = sqlc.sql("SELECT * FROM standardlocationrecords WHERE customerID = " + customerID + " AND partnerAppID = " + partnerAppID)
                    .toJavaRDD().map(new LocationRecordFromRow()::apply);

is the partition scheme from the dataframe preserved or do I need to repartition after doing a mapToPair using:

rdd.partitionBy and passing in a custom HashPartitioner that uses the hash of the ID field.

My goal is to reduce the shuffling when doing the final combineByKey to prevent the job from running out of memory and failing. Any help would be greatly appreciated.

Thanks, Nathan

1

1 Answers

2
votes

I am not sure if this is the correct way to partition a dataframe

It looks about right.

is the partition scheme from the dataframe preserved

Data distribution should be preserved what can be easily checked by looking at the debugString:

val df = sqlContext.read.parquet("/tmp/foo").repartition(10000, $"id")

df.rdd.toDebugString
// String =
// (10000) MapPartitionsRDD[46] at rdd at <console>:26 []
//    |    ShuffledRowRDD[45] at rdd at <console>:26 []
//    +-(8) MapPartitionsRDD[44] at rdd at <console>:26 []
//       |  $anon$1[43] at  []

but there is no partitioner set for the output RDD:

df.rdd.partitioner
// Option[org.apache.spark.Partitioner] = None

so this information cannot be used to optimize subsequent aggregation.

My goal is to reduce the shuffling

If so, it doesn't look like a correct approach. Assuming that mergeValue function passed to combineByKey is a reducing operation you actually shuffle more than by applying combineByKey directly. If it is not the case then applying combineByKey with mapSideCombine set to false is probably a better choice.

Depending on combine logic you should also consider performing aggregations directly on the DataFrame.