1
votes

I have two Spark 1.4.1 PipelineRDD (I am not sure what kind of object is that :-s :

1) a list of ids (ids_alsaciens RDD)

2) a list of personne (personnes RDD)

The 'Personnes' RDD has 4 fields, in a json format, the key being "id". I may have several line for the same personne in this table (the id being the same)

I would like to fetch all the lines on the 'personnes' RDD which id is contained on the 'alsacien' table.

How could I do that in spark ?

>type(ids_alsaciens)
pyspark.rdd.PipelinedRDD
>type(personnes)
pyspark.rdd.PipelinedRDD

>ids_alsaciens.take(10)
    [u'1933992',
     u'2705919',
     u'2914684',
     u'2915444',
     u'11602833',
     u'11801394',
     u'10707371',
     u'2018422',
     u'2312432',
     u'233375']
    >personnes.take(3)
    [{'date': '2013-06-03 00:00',
      'field': 'WAID_INDIVIDU_WC_NUMNNI',
      'id': '10000149',
      'value': '2770278'},
     {'date': '2013-05-15 00:00',
      'field': 'WAID_INDIVIDU_WC_NUMNNI',
      'id': '10009910',
      'value': '2570631'},
     {'date': '2013-03-01 00:00',
      'field': 'WAID_INDIVIDU_WC_NUMNNI',
      'id': '10014405',
      'value': '1840288'}]

EDIT

Tried : personnes.filter(lambda x: x in ids_alsaciens)

Got exception : Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

1

1 Answers

1
votes

The error SPARK-5063 occur because it is not allowed to call an RDD function inside a map, as the spark worker that runs the map task is unable to do the work by itself.

Use Spark RDD.join:

From documentation

join(otherDataset, [numTasks])    

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key

The secret is to know that Spark treats all 2-tuples as (key,value) pairs and you can use RDD.map() to make your own pairs:

kv_ids_alsaciens = ids_alsaciens.map(lambda id: (id, 0))

makes (k,v) pairs from ids_alsaciens where k=id and v=0. This is a little wasteful, but I have not tested if you can eliminate v.

Then with personnes:

kv_personnes = personnes.map(lambda p: (p['id'],p))

Now we can use join as so

joined_kv_ids_alsaciens_personnes = kv_ids_alsaciens.join(kv_personnes)

while will be RDD with entries like

(10000149, (0, {'date': '2013-06-03 00:00', 'field': 'WAID_INDIVIDU_WC_NUMNNI', 'id': '10000149', 'value': '2770278'}))

where the first item is a matching id, and the second item is a pair (match1,match2) where match1 is always 0 because our first data set always had 0 for the value in the pair, and match2 is a dict of the personnes data.

This is not quite what is needed. A better format may be to emit only the dict. We can do that with another map.

match_personnes = joined_kv_ids_alsaciens_personnes.map(lambda (k,(v1,v2)): v2)

All together, with cache() for final result in memory:

match_personnes = (ids_alsaciens
                   .map(lambda id: (id, 0))
                   .join(personnes.map(lambda p: (p['id'],p)))
                   .map(lambda (k,(v1,v2)): v2)
                   .cache()
                   )

To test:

match_personnes.take(10)