I am trying to perform the quickest lookup possible in Spark, as part of some practice rolling-my-own association rules module. Please note that I know the metric below, confidence, is supported in PySpark. This is just an example -- another metric, lift, is not supported, yet I intend to use the results from this discussion to develop that.
As part of calculating the confidence of a rule, I need to look at how often the antecedent and consequent occur together, as well as how often the antecedent occurs across the whole transaction set (in this case, rdd
).
from itertools import combinations, chain
def powerset(iterable, no_empty=True):
''' Produce the powerset for a given iterable '''
s = list(iterable)
combos = (combinations(s, r) for r in range(len(s)+1))
powerset = chain.from_iterable(combos)
return (el for el in powerset if el) if no_empty else powerset
# Set-up transaction set
rdd = sc.parallelize(
[
('a',),
('a', 'b'),
('a', 'b'),
('b', 'c'),
('a', 'c'),
('a', 'b'),
('b', 'c'),
('c',),
('b'),
]
)
# Create an RDD with the counts of each
# possible itemset
counts = (
rdd
.flatMap(lambda x: powerset(x))
.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x + y)
.map(lambda x: (frozenset(x[0]), x[1]))
)
# Function to calculate confidence of a rule
confidence = lambda x: counts.lookup(frozenset(x)) / counts.lookup((frozenset(x[1]),))
confidence_result = (
rdd
# Must be applied to length-two and greater itemsets
.filter(lambda x: len(x) > 1)
.map(confidence)
)
For those familiar with this type of lookup problem, you'll know that this type of Exception is raised:
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
One way to get around this exception is to convert counts
to a dictionary:
counts = dict(counts.collect())
confidence = lambda x: (x, counts[frozenset(x)] / counts[frozenset(x[1])])
confidence_result = (
rdd
# Must be applied to length-two and greater itemsets
.filter(lambda x: len(x) > 1)
.map(confidence)
)
Which gives me my result. But the process of running counts.collect
is very expensive, since in reality I have a dataset with 50m+ records. Is there a better option for performing this type of lookup?