2
votes

I have a large RDD, call it RDD1, that is approximately 300 million rows after an initial filter. What I would like to do is take the ids from RDD1 and find all other instances of it in another large dataset, call it RDD2, that is approximately 3 billion rows. RDD2 is created by querying a parquet table that is stored in Hive as well as RDD1. The number of unique ids from RDD1 is approximately 10 million elements.

My approach is to currently collect the ids and broadcast them and then filter RDD2.

My question is - is there a more efficient way to do this? Or is this best practice?

I have the following code -

hiveContext = HiveContext(sc)
RDD1 = hiveContext("select * from table_1")
RDD2 = hiveContext.sql("select * from table_2")

ids = RDD1.map(lambda x: x[0]).distinct() # This is approximately 10 million ids
ids = sc.broadcast(set(ids.collect()))

RDD2_filter = RDD2.rdd.filter(lambda x: x[0] in ids.value))
2
Why are you selecting all of the rows from table_1, only to throw them away with the RDD1.map() operation? That's not very efficient. The two ids= statements are confusing. Why are you collecting it to the head-end, only to send it out as a broadcast variable? Have you tried it without the collect() and the sc.broadcast ?vy32
@vy32 - I used the columns from RDD1 in later calculations - but I need the ids from it to query the second table. As for the collection you cannot broadcast an RDD you get a nice Exception.RDizzl3
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; see SPARK-5063.RDizzl3

2 Answers

2
votes

I think it would be better to just use a single SQL statement to do the join:

RDD2_filter = hiveContext.sql("""select distinct t2.*
                                 from table_1 t1
                                 join table_2 t2 on t1.id = t2.id""")
1
votes

What I would do is take the 300 milion ids from RDD1, construct a bloom filter (Bloom filter), use that as broadcast variable to filter RDD2 and you will get RDD2Partial that contains all key-value parits for key that are in RDD1, plus some false positives. If you expect the result to be within order of millions, than you will then be able to use normal operations like join,cogroup, etc. on RDD1 and RDD2Partial to obtain exact result without any problem.

This way you greatly reduce the time of the join operation if you expect the result to be of reasonable size, since the complexity remains the same. You might get some reasonable speedups (e.g. 2-10x) even if the result is within the order of hundreds of millions.

EDIT

The bloom filter can be collected efficiently since you can combine the bits set by one element with the bits set by another element with OR, which is associative and commutative.