10
votes

What is the best way to partition the data by a field into predefined partition count?

I am currently partitioning the data by specifying the partionCount=600. The count 600 is found to give best query performance for my dataset/cluster setup.

val rawJson = sqlContext.read.json(filename).coalesce(600)
rawJson.write.parquet(filenameParquet)

Now I want to partition this data by the column 'eventName' but still keep the count 600. The data currently has around 2000 unique eventNames, plus the number of rows in each eventName is not uniform. Around 10 eventNames have more than 50% of the data causing data skew. Hence if I do the partitioning like below, its not very performant. The write is taking 5x more time than without.

val rawJson = sqlContext.read.json(filename)
rawJson.write.partitionBy("eventName").parquet(filenameParquet)

What is a good way to partition the data for these scenarios? Is there a way to partition by eventName but spread this into 600 partitions?

My schema looks like this:

{  
  "eventName": "name1",
  "time": "2016-06-20T11:57:19.4941368-04:00",
  "data": {
    "type": "EventData",
    "dataDetails": {
      "name": "detailed1",
      "id": "1234",
...
...
    }
  }
} 

Thanks!

1

1 Answers

9
votes

This is a common problem with skewed data and there are several approaches you can take.

List bucketing works if the skew remains stable over time, which may or may not be the case, especially if new values of the partitioning variable are introduced. I have not researched how easy it is to adjust list bucketing over time and, as your comment states, you can't use that anyway because it is a Spark 2.0 feature.

If you are on 1.6.x, the key observation is that you can create your own function that maps each event name into one of 600 unique values. You can do this as a UDF or as a case expression. Then, you simply create a column using that function and then partition by that column using repartition(600, 'myPartitionCol) as opposed to coalesce(600).

Because we deal with very skewed data at Swoop, I've found the following workhorse data structure to be quite useful for building partitioning-related tools.

/** Given a key, returns a random number in the range [x, y) where
  * x and y are the numbers in the tuple associated with a key.
  */
class RandomRangeMap[A](private val m: Map[A, (Int, Int)]) extends Serializable {
  private val r = new java.util.Random() // Scala Random is not serializable in 2.10

  def apply(key: A): Int = {
    val (start, end) = m(key)
    start + r.nextInt(end - start)
  }

  override def toString = s"RandomRangeMap($r, $m)"
}

For example, here is how we build a partitioner for a slightly different case: one where the data is skewed and the number of keys is small so we have to increase the number of partitions for the skewed keys while sticking with 1 as the minimum number of partitions per key:

/** Partitions data such that each unique key ends in P(key) partitions.
  * Must be instantiated with a sequence of unique keys and their Ps.
  * Partition sizes can be highly-skewed by the data, which is where the
  * multiples come in.
  *
  * @param keyMap  maps key values to their partition multiples
  */
class ByKeyPartitionerWithMultiples(val keyMap: Map[Any, Int]) extends Partitioner {
  private val rrm = new RandomRangeMap(
    keyMap.keys
      .zip(
        keyMap.values
          .scanLeft(0)(_+_)
          .zip(keyMap.values)
          .map {
            case (start, count) => (start, start + count)
          }
      )
      .toMap
  )

  override val numPartitions =
    keyMap.values.sum

  override def getPartition(key: Any): Int =
    rrm(key)
}

object ByKeyPartitionerWithMultiples {

  /** Builds a UDF with a ByKeyPartitionerWithMultiples in a closure.
    *
    * @param keyMap  maps key values to their partition multiples
    */
  def udf(keyMap: Map[String, Int]) = {
    val partitioner = new ByKeyPartitionerWithMultiples(keyMap.asInstanceOf[Map[Any, Int]])
    (key:String) => partitioner.getPartition(key)
  }

}

In your case, you have to merge several event names into a single partition, which would require changes but I hope the code above gives you an idea how to approach the problem.

One final observation is that if the distribution of event names values a lot in your data over time, you can perform a statistics gathering pass over some part of the data to compute a mapping table. You don't have to do this all the time, just when it is needed. To determine that, you can look at the number of rows and/or size of output files in each partition. In other words, the entire process can be automated as part of your Spark jobs.