12
votes
Class ProdsTransformer:

    def __init__(self):  
      self.products_lookup_hmap = {}
      self.broadcast_products_lookup_map = None

    def create_broadcast_variables(self):
      self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap)

    def create_lookup_maps(self):
    // The code here builds the hashmap that maps Prod_ID to another space.

pt = ProdsTransformer ()
pt.create_broadcast_variables()  

pairs = distinct_users_projected.map(lambda x: (x.user_id,    
                         pt.broadcast_products_lookup_map.value[x.Prod_ID]))

I get the following error:

"Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063."

Any help with how to deal with the broadcast variables will be great!

1
That's not enough code and/or sample data for someone to try to duplicate the error and/or fix it. Also, in case you didn't notice, all the indentation is stripped out of the python. - Paul
I have added more code. - user3803714
I wonder if the error would go away if you moved the products_lookup_map out of the properties of ProdsTransformer instances and instead made it a global. Do you need more than one map? - Paul
I need multiple maps. - user3803714
OK, I think what the error means is that calling sc or rdd functions is forbidden within the workers, i.e. in any spark function like map(), flatmap(), reduce(), etc.... You can only call sc.something in the main program. So, for instance, you can chain maps, but you can't have a map within a map. And apparently broadcast is under that kind of restriction. I've seen spark mangle custom classes, and so there is some recreation of class instances on workers from serialized data that is being moved around. - Paul

1 Answers

22
votes

By referencing the object containing your broadcast variable in your map lambda, Spark will attempt to serialize the whole object and ship it to workers. Since the object contains a reference to the SparkContext, you get the error. Instead of this:

pairs = distinct_users_projected.map(lambda x: (x.user_id, pt.broadcast_products_lookup_map.value[x.Prod_ID]))

Try this:

bcast = pt.broadcast_products_lookup_map
pairs = distinct_users_projected.map(lambda x: (x.user_id, bcast.value[x.Prod_ID]))

The latter avoids the reference to the object (pt) so that Spark only needs to ship the broadcast variable.