2
votes

My question is relevant to my previous one: transform columns values to columns in pyspark dataframe

I have created a table "my_df" (a dataframe in pyspark):

+----+--------+---------------------------------+
|id  |payment        |shop                      |
+----+--------+---------------------------------+
|dapd|[credit, cash] |[retail, on-line]         |
|wrfr|[cash, debit]  |[supermarket, brand store]|
+----+--------+---------------------------------+

Now, I need to do clustering for the table such that I can find the similarity of the "id"s. I am trying k-means at first. So, I need to transform the categorical values to numerical values by one-hot encoding. I am referring How to handle categorical features with spark-ml?

my code:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

inputs, my_indx_list = [], []
for a_col in my_df.columns: 
  my_indx = StringIndexer(inputCol = a_col, outputCol = a_col + "_index")
  inputs.append(my_indx.getOutputCol())
  my_indx_list.append(my_indx)

  encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=[x + "_vector" for x in inputs])
  a_pipeline = Pipeline(stages = my_indx_list + [encoder])
  pipeline.fit(my_df).transform(my_df).show() # error here !

But, I got error:

A column must be either string type or numeric type, but got ArrayType(StringType,true)

So, how I can solve this?

My idea: sort the list value of each column, and concatenate each string in the list to a long string for each column.

But, for each column, the values are the answers for some survey questions and each answer has the same weight. I am not sure how to work it out ?

thanks

UPDATE

Based on the proposed solution, it works but it is very slow. It took about 3.5 hours on a cluster with 300 GB memory and 32 cores.

my code:

   from pyspark.ml.feature import CountVectorizer
   tmp_df = original_df # 3.5 million rows and 300 columns

   for a_col in original_df.columns: 
        a_vec = CountVectorizer(inputCol = a_col, outputCol = a_col + "_index", binary=True)
        tmp_df = a_vec.fit(tmp_df).transform(tmp_df)

  tmp_df.show()

The "original_df" has 3.5 million rows and 300 columns.

How can I speed up ?

thanks

1
How do you get the ouput like that, is it manual? the + - 'soppressionslayer
Can you add the line of code that is returning the error?Jortega
Code added, thanksuser3448011
For two array columns, use CountVectorizer with binary=True: spark.apache.org/docs/2.4.0/api/python/…jxc
Please have a look at this: stackoverflow.com/questions/58303468/…pissall

1 Answers

2
votes

@jxc suggested the brilliant use of CountVectorizer for one-hot encoding in your case, which is usually used for counting tokens in natural language processing.

Using CountVectorizer saves you troubles in dealing with explode and collect_set with OneHotEncoderEstimator; or worse if you try to implement it using udf.

Given this dataframe,

df = spark.createDataFrame([
                            {'id': 'dapd', 'payment': ['credit', 'cash'], 'shop': ['retail', 'on-line']},
                            {'id': 'wrfr', 'payment': ['cash', 'debit'], 'shop': ['supermarket', 'brand store']}
                           ])
df.show()

+----+--------------+--------------------+
|  id|       payment|                shop|
+----+--------------+--------------------+
|dapd|[credit, cash]|   [retail, on-line]|
|wrfr| [cash, debit]|[supermarket, bra...|
+----+--------------+--------------------+

You can one-hot encode by treating the array of strings as tokens in natural language processing. Note the use of binary=True to force it to return only 0 or 1.

from pyspark.ml.feature import CountVectorizer

payment_cv = CountVectorizer(inputCol="payment", outputCol="paymentEnc", binary=True)
first_res_df = payment_cv.fit(df).transform(df)

shop_cv = CountVectorizer(inputCol="shop", outputCol="shopEnc", binary=True)
final_res_df = shop_cv.fit(first_res_df).transform(first_res_df)

final_res_df.show()

+----+--------------+--------------------+-------------------+-------------------+
|  id|       payment|                shop|         paymentEnc|            shopEnc|
+----+--------------+--------------------+-------------------+-------------------+
|dapd|[credit, cash]|   [retail, on-line]|(3,[0,2],[1.0,1.0])|(4,[0,3],[1.0,1.0])|
|wrfr| [cash, debit]|[supermarket, bra...|(3,[0,1],[1.0,1.0])|(4,[1,2],[1.0,1.0])|
+----+--------------+--------------------+-------------------+-------------------+