6
votes

I can't find much documentation on ensuring partitioning order - i just want to ensure that given a set of deterministic transformations (output rows always the same), partitions always receive the same set of elements if the underlying dataset doesn't change. Is that possible?

It doesn't need to be sorted: an example would be after a set of transformations are applied on an RDD, it looks like this now -> (A, B, C, D, E, F, G)

And if my spark.default.parallelism were 2 or 3, the set of elements would always be either: (A, B, C, D), (E, F, G) or (A, B), (C, D), (E, F, G) respectively.

This is because i have to cause my executors will be causing some side effects based on the partition/set of elements it is operating on, and i want to make sure that the Spark application is idempotent. (same side effect if it restarts)

Edit: Apparently, DF repartition is deterministic but RDD partition is not (Spark 2.4.4).

def f1(rdds):
    rows = list(rdds)
    stats_summary = [{
        'origin': str(row['origin']),
        'dest': str(row['dest']),
        'start_time': analysis_date.isoformat(),
        'value': row['count']
    } for row in rows]

    stats_summary.sort(key=lambda t: (t['start_time'], t['origin'], t['dest']))

    rtn = "partition size: {}, first: ({}, {}), last: ({}, {})".format(
        len(rows), 
        stats_summary[0]["origin"], stats_summary[0]["dest"],
        stats_summary[-1]["origin"], stats_summary[-1]["dest"])
    return [rtn]

repartition_rdd_res = unq_statistics.rdd \
                                    .repartition(10) \
                                    .mapPartitions(f1) \
                                    .collect()

repartition_df_res = unq_statistics.repartition(10) \
                                   .rdd \
                                   .mapPartitions(f1) \
                                   .collect()

repartition_rdd_res4 = ['partition size: 131200, first: (-1, -1), last: (999, -1)',
 'partition size: 131209, first: (-1, 1014), last: (996, 996)',
 'partition size: 131216, first: (-1, 1021), last: (999, 667)',
 'partition size: 131218, first: (-1, 1008), last: (991, 1240)',
 'partition size: 131222, first: (-1, 1001), last: (994, 992)',
 'partition size: 131229, first: (-1, 1007), last: (994, 890)',
 'partition size: 131233, first: (-1, 1004), last: (991, -1)',
 'partition size: 131235, first: (-1, 1005), last: (999, 1197)',
 'partition size: 131237, first: (-1, 100), last: (999, 997)',
 'partition size: 131240, first: (-1, 1010), last: (994, -1)']

repartition_rdd_res3 = ['partition size: 131200, first: (-1, -1), last: (999, -1)',
 'partition size: 131209, first: (-1, 1006), last: (994, 2048)',
 'partition size: 131216, first: (-1, 1002), last: (996, 996)',
 'partition size: 131218, first: (-1, 1017), last: (999, 667)',
 'partition size: 131222, first: (-1, 1008), last: (994, 890)',
 'partition size: 131229, first: (-1, 1000), last: (99, 96)',
 'partition size: 131233, first: (-1, 1001), last: (994, 992)',
 'partition size: 131235, first: (-1, 1009), last: (990, 1601)',
 'partition size: 131237, first: (-1, 1004), last: (994, -1)',
 'partition size: 131240, first: (-1, 1003), last: (999, 997)']

repartition_rdd_res2 = ['partition size: 131200, first: (-1, 1013), last: (991, 2248)',
 'partition size: 131209, first: (-1, 1007), last: (999, 667)',
 'partition size: 131216, first: (-1, 100), last: (99, 963)',
 'partition size: 131218, first: (-1, 1002), last: (999, 997)',
 'partition size: 131222, first: (-1, 101), last: (996, 996)',
 'partition size: 131229, first: (-1, -1), last: (991, 1240)',
 'partition size: 131233, first: (-1, 1006), last: (999, 1197)',
 'partition size: 131235, first: (-1, 1001), last: (994, 992)',
 'partition size: 131237, first: (-1, 1019), last: (999, -1)',
 'partition size: 131240, first: (-1, 1017), last: (991, -1)']

repartition_df_res2 = ['partition size: 131222, first: (-1, 1023), last: (996, 996)',
 'partition size: 131223, first: (-1, 1003), last: (999, 667)',
 'partition size: 131223, first: (-1, 1012), last: (990, 990)',
 'partition size: 131224, first: (-1, -1), last: (999, 1558)',
 'partition size: 131224, first: (-1, 100), last: (99, 98)',
 'partition size: 131224, first: (-1, 1008), last: (99, 968)',
 'partition size: 131224, first: (-1, 1018), last: (999, 997)',
 'partition size: 131225, first: (-1, 1006), last: (994, 992)',
 'partition size: 131225, first: (-1, 101), last: (990, 935)',
 'partition size: 131225, first: (-1, 1013), last: (999, 1197)']
5

5 Answers

6
votes

Lets look at the source, and specifically its shuffle part:

...
if (shuffle) {
  /** Distributes elements evenly across output partitions, starting from a random partition. */
  val distributePartition = (index: Int, items: Iterator[T]) => {
    var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
    items.map { t =>
      // Note that the hash code of the key will just be the key itself. The HashPartitioner
      // will mod it with the number of total partitions.
      position = position + 1
      (position, t)
    }
  } : Iterator[(Int, T)]
  ...

As you can see the distribution of elements from a given source partition N into X target partitions is a simple increment (later modulo'ed by X) starting from some number which depends only on that N, and hence pre-determined. So if your source RDD is unchanged, the result of repartition(X) should be the same every time as well.

1
votes

Internally, Spark uses a default partitioner(HashPartitioner depending on the data) to partition the data, which uses hash to identify which partition that item belongs to. Thus, you can say that the data item will always go to the same partition given that the partition count is same, because if partition count is change, it will effect the hash as well.

0
votes

Regarding fault tolerance if a partition fails it will be resubmited to another executor so partitions aren't ordered.

Spark has some internal mecanism to calculate the best execution plan regarding the data you have. So you cannot predict order and content of partitions.

0
votes

The distribution of records across partitions needn't be uniform. The number of partitions are guaranteed and number of records in each partition will be roughly same. It doesn't matter for any operation. If shuffle happens due to any reason then new partitions will be recreated.

Say (A, B, C, D, E, F, G) got partitioned into 2 as (A, B, C, D) and (E, F, G). If the executer processing (E, F, G) dies then Spark will restart it and try to reprocess (E, F, G). If this executor is no recoverable then the whole job will fail and it will start over from partitioning (A, B, C, D, E, F, G) into 2 pieces and restart processing. In the 2nd attempt it may split as (A, B, C) and (D, E, F, G). The end result of processing will be same.

0
votes

In addition to what everyone says:

  • Dataframe is based on RDD.
  • You say: Dataframe repartitioning is deterministic.
  • If RDD repartitioning would not be deterministic, then also Dataframe repartitioning would not be.