0
votes

I want to run k-means within Spark on data provided from a MongoDB. I have a working example which acts against a flatfile:

sc = SparkContext(appName="KMeansExample")  # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

this is the format of the flatfile is:

0 0 1
1 1 1
2 2 2
9 9 6

Now I want to replace the flatfile with a MongoDB:

spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()

# <<<< Here I am missing the parsing >>>>>

clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

I like to understand how to map data from the df so that it can be used as input for kmeans.

The "layout" of the database is:
root
|-- _id: string (nullable = true)
|-- field0: binary (nullable = true)
|-- field1: binary (nullable = true)
|-- field2: binary (nullable = true)
|-- field3: binary (nullable = true)
|-- field4: binary (nullable = true)
|-- field5: binary (nullable = true)
|-- field6: binary (nullable = true)
|-- field7: binary (nullable = true)
|-- field8: binary (nullable = true)
|-- field9: binary (nullable = true)

1

1 Answers

1
votes

I like to understand how to map data from the df so that it can be used as input for kmeans.

Based on your snippet, I assumed that you're using PySpark.

If you look into clustering.KMeans Python API doc, you can see that the first parameter needs to be RDD of Vector or convertible sequence types

After you performed below code which load data from MongoDB using MongoDB Spark Connector

df = spark.read.format("com.mongodb.spark.sql.DefaultSource")
               .option("uri","mongodb://127.0.0.1/ycsb.usertable")
               .load()

What you have in df is a DataFrame, so we need to convert it into something convertible to a Vector type.

Since you are using numpy.array in your text file example, we can keep using this array type for learning transition.

Based on the provided layout, firstly we need to remove the _id column as it won't be needed for the clustering training. See also Vector data type for more information.

With the above information, let's get into it:

# Drop _id column and get RDD representation of the DataFrame
rowRDD = df.drop("_id").rdd

# Convert RDD of Row into RDD of numpy.array
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))

# Feed into KMeans 
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")

If you would like to keep the boolean value (True/False) instead of integer (1/0), then you can remove the int part. As below:

parsedRdd = rowRDD.map(lambda row: array([x for x in row]))

Putting all of them together :

from numpy import array 
from pyspark.mllib.clustering import KMeans
import org.apache.spark.sql.SparkSession

spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()

rowRDD = df.drop("_id").rdd
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))

clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
clusters.clusterCenters