I know Kmeans is not a good selection to be applied to categorical data, but we dont have much options in spark 1.4 for clustering categorical data.
Regardless of above issue. I'm getting errors in my below code.
I read my table from hive, use onehotencoder in a pipeline and then send the code into Kmeans.
Im getting an error when running this code.
Could the error be in datatype fed to Kmeans? doen is expect numpay Array data? if so How can I transfer my indexed data to numpy array!?!?
All comments are aporeciated and thanks for your help!
The error Im getting:
Traceback (most recent call last): File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark /daemon.py", line 157, in manager
File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 544, in read_int
raise EOFError
EOFError
File "", line 1
Traceback (most recent call last):
My code:
#aline will be passed in from another rdd
aline=["xxx","yyy"]
# get data from Hive table & select the column & convert back to Rdd
rddRes2=hCtx.sql("select XXX, YYY from table1 where xxx <> ''")
rdd3=rddRes2.rdd
#fill the NA values with "none"
Rdd4=rdd3.map(lambda line: [x if len(x) else 'none' for x in line])
# convert it back to Df
DataDF=Rdd4.toDF(aline)
# Indexers encode strings with doubles
string_indexers=[
StringIndexer(inputCol=x,outputCol="idx_{0}".format(x))
for x in DataDF.columns if x not in '' ]
encoders=[
OneHotEncoder(inputCol="idx_{0}".format(x),outputCol="enc_{0}".format(x))
for x in DataDF.columns if x not in ''
]
# Assemble multiple columns into a single vector
assembler=VectorAssembler(
inputCols=["enc_{0}".format(x) for x in DataDF.columns if x not in ''],
outputCol="features")
pipeline= Pipeline(stages=string_indexers+encoders+[assembler])
model=pipeline.fit(DataDF)
indexed=model.transform(DataDF)
labeled_points=indexed.select("features").map(lambda row: LabeledPoint(row.features))
# Build the model (cluster the data)
clusters = KMeans.train(labeled_points, 3, maxIterations=10,runs=10, initializationMode="random")
LabeledPoint(row.features)
is not a valid call toLabeledPoint
. – zero323