4
votes

I want to pass few extra parameters to the python function from the mappartition. Any suggestions..

My sample code looks like this

 def test(x,abc):
   <<code>>

 abc =1234
 df = df.repartition("key")
 res= df.rdd.mapPartitions(test, abc)

If i pass abc as parameter and use it inside the test funtion i am getting below error

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Mariusz please find the change

from pyspark.sql import Row
def test(abc):
    def my_map_partitions(x):
       print("----------start-----------")
       cnt=1
       ret = []
       for i in x:
         cnt=cnt+1
         val = Row(key1=i.key1, key2=i.key2, cnt=cnt)
         ret.append(val)
       return ret 
    return my_map_partitions
df = df.repartition("key1key2").sortWithinPartitions("key1key2")  
abc123 = df .rdd.mapPartitions(test(abc)) 
1

1 Answers

0
votes

Try creating function that returns function, for example:

def test(abc):
    def my_map_partitions(partition):
        ...do something with partition and abc...
    return my_map_partitions

df.rdd.mapPartitions(test(abc))