I want to run k-means within Spark on data provided from a MongoDB. I have a working example which acts against a flatfile:
sc = SparkContext(appName="KMeansExample") # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
this is the format of the flatfile is:
0 0 1
1 1 1
2 2 2
9 9 6
Now I want to replace the flatfile with a MongoDB:
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()
# <<<< Here I am missing the parsing >>>>>
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
I like to understand how to map data from the df so that it can be used as input for kmeans.
The "layout" of the database is:
root
|-- _id: string (nullable = true)
|-- field0: binary (nullable = true)
|-- field1: binary (nullable = true)
|-- field2: binary (nullable = true)
|-- field3: binary (nullable = true)
|-- field4: binary (nullable = true)
|-- field5: binary (nullable = true)
|-- field6: binary (nullable = true)
|-- field7: binary (nullable = true)
|-- field8: binary (nullable = true)
|-- field9: binary (nullable = true)