I am trying to build a clustering mechanism using
- Google Dataproc + Spark
- Google Bigquery
- Create a job using Spark ML KMeans+pipeline
As follows:
Create user level based feature table in bigquery
Example: How the feature table looks likeuserid |x1 |x2 |x3 |x4 |x5 |x6 |x7 |x8 |x9 |x10
00013 |0.01 | 0 |0 |0 |0 |0 |0 |0.06 |0.09 | 0.001- Spin up a default setting cluster, am using gcloud command line interface to create the cluster and run jobs as shown here
- Using the starter code provided, I read the BQ table, convert RDD into a Dataframe and pass to KMeans model/pipeline:
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors, _convert_to_vector
from pyspark.sql.types import Row
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
# This assumes the Google Cloud Storage connector for Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'my-project',
'mapred.bq.input.dataset.id': 'tempData',
'mapred.bq.input.table.id': 'userFeatureInBQ'}
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',conf=conf)
# Tranform the userid-Feature table into feature_data RDD
feature_data = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
x['x5'],x['x6'],x['x7'],x['x8'],
x['x9'],x['x10'])))
# Function to convert each line in RDD into an array, return the vector
def parseVector(values):
array = np.array([float(v) for v in values])
return _convert_to_vector(array)
# Convert the RDD into a row wise RDD
data = feature_data.map(parseVector)
row_rdd = data.map(lambda x: Row(x))
sqlContext = SQLContext(sc)
# cache the RDD to improve performance
row_rdd.cache()
# Create a Dataframe
df = sqlContext.createDataFrame(row_rdd, ["features"])
# cache the Dataframe
df.cache()
Here is the Schema and head() which I print to the console:
|-- features: vector (nullable = true)
[Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]
- Run the clustering KMeans algorithm in following manner
- Run the model multiple times
- With different parameters (Namely, change the #clusters and init_mode)
- Calculate error or Cost metric
- Choose best model-parameter combination
- Create pipeline with KMeans as an estimator
- Pass multiple parameters using paramMap
#Define the paramMap & model
paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
{'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
{'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
{'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
{'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
{'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
{'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
{'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})
km = KMeans()
# Create a Pipeline with estimator stage
pipeline = Pipeline(stages=[km])
# Call & fit the pipeline with the paramMap
models = pipeline.fit(df, paramMap)`
print models
I get the following output with a warning
7:03:24 WARN org.apache.spark.mllib.clustering.KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.
[PipelineModel_443dbf939b7bd3bf7bfc, PipelineModel_4b64bb761f4efe51da50, PipelineModel_4f858411ac19beacc1a4, PipelineModel_4f58b894f1d14d79b936, PipelineModel_4b8194f7a5e6be6eaf33, PipelineModel_4fc5b6370bff1b4d7dba, PipelineModel_43e0a196f16cfd3dae57, PipelineModel_47318a54000b6826b20e, PipelineModel_411bbe1c32db6bf0a92b, PipelineModel_421ea1364d8c4c9968c8, PipelineModel_4acf9cdbfda184b00328, PipelineModel_42d1a0c61c5e45cdb3cd, PipelineModel_4f0db3c394bcc2bb9352, PipelineModel_441697f2748328de251c, PipelineModel_4a64ae517d270a1e0d5a, PipelineModel_4372bc8db92b184c05b0]
#Print the cluster centers:
for model in models:
print vars(model)
print model.stages[0].clusterCenters()
print model.extractParamMap()
Output:
[array([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25822915e-02, 0.00000000e+00, 6.93060772e-07, 1.41766847e-03, 1.60941306e-02], array([2.36494105e-06, 1.87719732e-02, 3.73829379e-03, 0.00000000e+00, 4.20724542e-02, 2.28675684e-02, 0.00000000e+00, 5.45002249e-06, 1.17331153e-02, 1.24364600e-02])
Here it the list of questions and need help with:
- I get a list with only 2 cluster centers as arrays for all models,
- It seems the KMeans models is defaulting to k=2 when I try to access the pipeline? Why would this happen?
- The last loop is supposed to access the pipelineModel and the 0th stage and run the clusterCenter() method? Is this the right method?
- Why do I get the error that data is uncached?
- I could not find how to compute the WSSSE or any equivalent method like .computeCost()(for mllib) when using a pipeline? How can I compare the different models based on different parameters?
- I tried the following code to run the .computeCost method as defined in the source code here:
- This defeats the purpose of running KMeans model and model selection in parallel using pipeline, however I have tried the following code:
#computeError
def computeCost(model, rdd):`
"""Return the K-means cost (sum of squared distances of
points to their nearest center) for this model on the given data."""
cost = callMLlibFunc("computeCostKmeansModel",
rdd.map(_convert_to_vector),
[_convert_to_vector(c) for c in model.clusterCenters()])
return cost
cost= np.zeros(len(paramMap))
for i in range(len(paramMap)):
cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
print cost
This prints out the following at the end of the loop:
[ 634035.00294687 634035.00294687 634035.00294687 634035.00294687
634035.00294687 634035.00294687 634035.00294687 634035.00294687
634035.00294687 634035.00294687 634035.00294687 634035.00294687
634035.00294687 634035.00294687 634035.00294687 634035.00294687]
- The cost/error calculated is the same for each model? Again cannot access the pipelineModel with the correct parameters.
Any help/ guidance is much appreciated! Thanks!