1
votes

I am trying to do binning on a particular column in dataframe based on the data given in the dictionary.

Below is the dataframe I used:

df

SNo,Name,CScore
1,x,700
2,y,850
3,z,560
4,a,578
5,b,456
6,c,678

I have created the below function,it is working fine if I use it seperately.


def binning(column,dict):
    finalColumn=[]
    lent = len(column)
    for i in range(lent):
        for j in range(len(list(dict))):
            if( int(column[i]) in range(list(dict)[j][0],list(dict)[j][1])):
                finalColumn.append(dict[list(dict)[j]])
    return finalColumn

I have used the above function in the below statement.

newDf = df.withColumn("binnedColumn",binning(df.select("CScore").rdd.flatMap(lambda x: x).collect(),{(1,400):'Low',(401,900):'High'}))

I am getting the below error:

Traceback (most recent call last): File "", line 1, in File "C:\spark_2.4\python\pyspark\sql\dataframe.py", line 1988, in withColumn assert isinstance(col, Column), "col should be Column" AssertionError: col should be Column

Any help to solve this issue will be of great help.Thanks.

1
You need to make binning into a user defined function (udf). Also don't name your variables dict. Also calling collect() inside withColumn is going to give you terrible performance.You can probably achieve the same result using when() and between().pault
I have used the udf and renamed the dict variable ,even though it is not working.Any other suggestion ? Thanks..Vineel
Please edit your question to include a reproducible example which includes the exact code you've tried and the full error messages or an explanation of why it's not working. It's also recommended that you also take the tour and read How to Ask.pault

1 Answers

1
votes

Lets start with creating the data:

rdd = sc.parallelize([[1,"x",700],[2,"y",850],[3,"z",560],[4,"a",578],[5,"b",456],[6,"c",678]])
df = rdd.toDF(["SNo","Name","CScore"])
>>> df.show()
+---+----+------+
|SNo|Name|CScore|
+---+----+------+
|  1|   x|   700|
|  2|   y|   850|
|  3|   z|   560|
|  4|   a|   578|
|  5|   b|   456|
|  6|   c|   678|
+---+----+------+

if your final goal is to provide a binning_dictionary like your example do, to find the corresponding categorical value. udf is the solution.

the following is your normal function.

bin_lookup = {(1,400):'Low',(401,900):'High'}
def binning(value, lookup_dict=bin_lookup):
    for key in lookup_dict.keys():
        if key[0] <= value <= key[1]:
             return lookup_dict[key]

to register as a udf and run it via dataframe:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

binning_udf = udf(binning, StringType())
>>> df.withColumn("binnedColumn", binning_udf("CScore")).show()
+---+----+------+------------+
|SNo|Name|CScore|binnedColumn|
+---+----+------+------------+
|  1|   x|   700|        High|
|  2|   y|   850|        High|
|  3|   z|   560|        High|
|  4|   a|   578|        High|
|  5|   b|   456|        High|
|  6|   c|   678|        High|
+---+----+------+------------+

directly apply to rdd:

>>> rdd.map(lambda row: binning(row[-1], bin_lookup)).collect()
['High', 'High', 'High', 'High', 'High', 'High']