My question is relevant to my previous one: transform columns values to columns in pyspark dataframe
I have created a table "my_df" (a dataframe in pyspark):
+----+--------+---------------------------------+
|id |payment |shop |
+----+--------+---------------------------------+
|dapd|[credit, cash] |[retail, on-line] |
|wrfr|[cash, debit] |[supermarket, brand store]|
+----+--------+---------------------------------+
Now, I need to do clustering for the table such that I can find the similarity of the "id"s. I am trying k-means at first. So, I need to transform the categorical values to numerical values by one-hot encoding. I am referring How to handle categorical features with spark-ml?
my code:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
inputs, my_indx_list = [], []
for a_col in my_df.columns:
my_indx = StringIndexer(inputCol = a_col, outputCol = a_col + "_index")
inputs.append(my_indx.getOutputCol())
my_indx_list.append(my_indx)
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=[x + "_vector" for x in inputs])
a_pipeline = Pipeline(stages = my_indx_list + [encoder])
pipeline.fit(my_df).transform(my_df).show() # error here !
But, I got error:
A column must be either string type or numeric type, but got ArrayType(StringType,true)
So, how I can solve this?
My idea: sort the list value of each column, and concatenate each string in the list to a long string for each column.
But, for each column, the values are the answers for some survey questions and each answer has the same weight. I am not sure how to work it out ?
thanks
UPDATE
Based on the proposed solution, it works but it is very slow. It took about 3.5 hours on a cluster with 300 GB memory and 32 cores.
my code:
from pyspark.ml.feature import CountVectorizer
tmp_df = original_df # 3.5 million rows and 300 columns
for a_col in original_df.columns:
a_vec = CountVectorizer(inputCol = a_col, outputCol = a_col + "_index", binary=True)
tmp_df = a_vec.fit(tmp_df).transform(tmp_df)
tmp_df.show()
The "original_df" has 3.5 million rows and 300 columns.
How can I speed up ?
thanks
binary=True
: spark.apache.org/docs/2.4.0/api/python/… – jxc