I'm completely new to Spark and currently I'm trying to use Python to write a simple code that does KMeans on a set of data.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import re
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg import SparseVector
from numpy import array
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import pandas as pd
import numpy
df = pd.read_csv("/<path>/Wholesale_customers_data.csv")
sql_sc = SQLContext(sc)
cols = ["Channel", "Region", "Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"]
s_df = sql_sc.createDataFrame(df)
vectorAss = VectorAssembler(inputCols=cols, outputCol="feature")
vdf = vectorAss.transform(s_df)
km = KMeans.train(vdf, k=2, maxIterations=10, runs=10, initializationMode="k-means||")
model = kmeans.fit(vdf)
cluster = model.clusterCenters()
print(cluster)
I typed these into pyspark shell, and when it runs model = kmeans.fit(vdf), I got the following errors:
TypeError: Cannot convert type into Vector
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:275) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/02/26 23:31:58 ERROR Executor: Exception in task 6.0 in stage 23.0 (TID 113) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/init.py", line 77, in _convert_to_vector raise TypeError("Cannot convert type %s into Vector" % type(l)) TypeError: Cannot convert type into Vector The
data I got is from: https://archive.ics.uci.edu/ml/machine-learning-databases/00292/Wholesale%20customers%20data.csv
Could someone please tell me what is going wrong here and what I missed? I appreciate any help.
Thank you!
UPDATE: @Garren The errors I got is:
The errors I got is: >>> kmm = kmeans.fit(s_df)17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 5 17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 4
Traceback (most recent call last): File "", line 1, in File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/pipeline.py", line 69, in fit return self._fit(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 133, in _fit java_model = self._fit_java(dataset) File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py", line 130, in _fit_java return self._java_obj.fit(dataset._jdf) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot resolve 'features' given input columns: [Channel, Grocery, Fresh, Frozen, Detergents_Paper, Region, Delicassen, Milk];"