I want to pass few extra parameters to the python function from the mappartition. Any suggestions..
My sample code looks like this
def test(x,abc):
<<code>>
abc =1234
df = df.repartition("key")
res= df.rdd.mapPartitions(test, abc)
If i pass abc as parameter and use it inside the test funtion i am getting below error
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
Mariusz please find the change
from pyspark.sql import Row
def test(abc):
def my_map_partitions(x):
print("----------start-----------")
cnt=1
ret = []
for i in x:
cnt=cnt+1
val = Row(key1=i.key1, key2=i.key2, cnt=cnt)
ret.append(val)
return ret
return my_map_partitions
df = df.repartition("key1key2").sortWithinPartitions("key1key2")
abc123 = df .rdd.mapPartitions(test(abc))