1
votes

I am trying to use Spark summary statistics as described at: https://spark.apache.org/docs/1.1.0/mllib-statistics.html

According to Spark docs :

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.DenseVector

val observations: RDD[Vector] = ... // an RDD of Vectors

// Compute column summary statistics.
val summary: MultivariateStatisticalSummary =     Statistics.colStats(observations)

I have a problem building observations:RDD[Vector] object. I try:

scala> val data:Array[Double] = Array(1, 2, 3, 4, 5)
data: Array[Double] = Array(1.0, 2.0, 3.0, 4.0, 5.0)

scala> val v = new DenseVector(data)
v: org.apache.spark.mllib.linalg.DenseVector = [1.0,2.0,3.0,4.0,5.0]

scala> val observations = sc.parallelize(Array(v))
observations:   org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector] =   ParallelCollectionRDD[3] at parallelize at <console>:19

scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
<console>:21: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector]
 required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
Note: org.apache.spark.mllib.linalg.DenseVector <: org.apache.spark.mllib.linalg.Vector, but class RDD is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
val summary: MultivariateStatisticalSummary =  Statistics.colStats(observations)

Questions:

1) How should I cast DenseVector to Vector?

2) In real program instead of array of doubles I have a to get statistics on a collection that I get from RDD using:

def countByKey(): Map[K, Long]
//Count the number of elements for each key, and return the result to the master as a Map.

So I have to do:

 myRdd.countByKey().values.map(_.toDouble)

Which does not make much sense because instead of working with RDDs I now have to work with regular Scala collections whiich at some time stop fitting into memory. All advantages of Spark distributed computations is lost.

How to solve this in scalable manner?

Update

In my case I have:

val cnts: org.apache.spark.rdd.RDD[Int] = prodCntByCity.map(_._2) // get product counts only 
val doubleCnts: org.apache.spark.rdd.RDD[Double] = cnts.map(_.toDouble)

How to convert doubleCnts into observations: RDD[Vector] ?

2

2 Answers

1
votes

1) You don't need to cast, you just need to type:

val observations = sc.parallelize(Array(v: Vector))

2) Use aggregateByKey (map all the keys to to 1, and reduce by summing) rather than countByKey.

0
votes

DenseVector has a compressed function. so you can change the RDD[ DenseVector] to RDD[Vector] as :

    val st =observations.map(x=>x.compressed)