I can't find much documentation on ensuring partitioning order - i just want to ensure that given a set of deterministic transformations (output rows always the same), partitions always receive the same set of elements if the underlying dataset doesn't change. Is that possible?
It doesn't need to be sorted: an example would be after a set of transformations are applied on an RDD, it looks like this now -> (A, B, C, D, E, F, G)
And if my spark.default.parallelism were 2 or 3, the set of elements would always be either: (A, B, C, D), (E, F, G) or (A, B), (C, D), (E, F, G) respectively.
This is because i have to cause my executors will be causing some side effects based on the partition/set of elements it is operating on, and i want to make sure that the Spark application is idempotent. (same side effect if it restarts)
Edit: Apparently, DF repartition is deterministic but RDD partition is not (Spark 2.4.4).
def f1(rdds):
rows = list(rdds)
stats_summary = [{
'origin': str(row['origin']),
'dest': str(row['dest']),
'start_time': analysis_date.isoformat(),
'value': row['count']
} for row in rows]
stats_summary.sort(key=lambda t: (t['start_time'], t['origin'], t['dest']))
rtn = "partition size: {}, first: ({}, {}), last: ({}, {})".format(
len(rows),
stats_summary[0]["origin"], stats_summary[0]["dest"],
stats_summary[-1]["origin"], stats_summary[-1]["dest"])
return [rtn]
repartition_rdd_res = unq_statistics.rdd \
.repartition(10) \
.mapPartitions(f1) \
.collect()
repartition_df_res = unq_statistics.repartition(10) \
.rdd \
.mapPartitions(f1) \
.collect()
repartition_rdd_res4 = ['partition size: 131200, first: (-1, -1), last: (999, -1)',
'partition size: 131209, first: (-1, 1014), last: (996, 996)',
'partition size: 131216, first: (-1, 1021), last: (999, 667)',
'partition size: 131218, first: (-1, 1008), last: (991, 1240)',
'partition size: 131222, first: (-1, 1001), last: (994, 992)',
'partition size: 131229, first: (-1, 1007), last: (994, 890)',
'partition size: 131233, first: (-1, 1004), last: (991, -1)',
'partition size: 131235, first: (-1, 1005), last: (999, 1197)',
'partition size: 131237, first: (-1, 100), last: (999, 997)',
'partition size: 131240, first: (-1, 1010), last: (994, -1)']
repartition_rdd_res3 = ['partition size: 131200, first: (-1, -1), last: (999, -1)',
'partition size: 131209, first: (-1, 1006), last: (994, 2048)',
'partition size: 131216, first: (-1, 1002), last: (996, 996)',
'partition size: 131218, first: (-1, 1017), last: (999, 667)',
'partition size: 131222, first: (-1, 1008), last: (994, 890)',
'partition size: 131229, first: (-1, 1000), last: (99, 96)',
'partition size: 131233, first: (-1, 1001), last: (994, 992)',
'partition size: 131235, first: (-1, 1009), last: (990, 1601)',
'partition size: 131237, first: (-1, 1004), last: (994, -1)',
'partition size: 131240, first: (-1, 1003), last: (999, 997)']
repartition_rdd_res2 = ['partition size: 131200, first: (-1, 1013), last: (991, 2248)',
'partition size: 131209, first: (-1, 1007), last: (999, 667)',
'partition size: 131216, first: (-1, 100), last: (99, 963)',
'partition size: 131218, first: (-1, 1002), last: (999, 997)',
'partition size: 131222, first: (-1, 101), last: (996, 996)',
'partition size: 131229, first: (-1, -1), last: (991, 1240)',
'partition size: 131233, first: (-1, 1006), last: (999, 1197)',
'partition size: 131235, first: (-1, 1001), last: (994, 992)',
'partition size: 131237, first: (-1, 1019), last: (999, -1)',
'partition size: 131240, first: (-1, 1017), last: (991, -1)']
repartition_df_res2 = ['partition size: 131222, first: (-1, 1023), last: (996, 996)',
'partition size: 131223, first: (-1, 1003), last: (999, 667)',
'partition size: 131223, first: (-1, 1012), last: (990, 990)',
'partition size: 131224, first: (-1, -1), last: (999, 1558)',
'partition size: 131224, first: (-1, 100), last: (99, 98)',
'partition size: 131224, first: (-1, 1008), last: (99, 968)',
'partition size: 131224, first: (-1, 1018), last: (999, 997)',
'partition size: 131225, first: (-1, 1006), last: (994, 992)',
'partition size: 131225, first: (-1, 101), last: (990, 935)',
'partition size: 131225, first: (-1, 1013), last: (999, 1197)']