0
votes

While I am reading CSV data from a directory which contains double values and applying streaming K-means model on it as follows,

//CSV file

40.729,-73.9422
40.7476,-73.9871
40.7424,-74.0044
40.751,-73.9869
40.7406,-73.9902
.....

//SBT dependencies:

name := "Application name"

version := "0.1"

scalaVersion := "2.11.12"
val sparkVersion ="2.3.1"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % "2.3.1")

//import statement

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext, rdd}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.mllib.clustering.{ KMeans,StreamingKMeans}
import org.apache.spark.mllib.linalg.Vectors

//Reading Csv data

val trainingData = ssc.textFileStream ("directory path") 
                      .map(x=>x.toDouble)
                      .map(x=>Vectors.dense(x))
// applying Streaming kmeans model
val model = new StreamingKMeans()
  .setK(numClusters)
  .setDecayFactor(1.0)
  .setRandomCenters(numDimensions, 0.0)
model.trainOn(trainingData)

I get the following error:

18/07/24 11:20:04 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 1) java.lang.NumberFormatException: For input string: "40.7473,-73.9857" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:285) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at ubu$$anonfun$1.apply(uberclass.scala:305) at ubu$$anonfun$1.apply(uberclass.scala:305) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Can anyone please help?

1
The error message is pretty clear, the string "40.7473,-73.9857" is not a number. It's two numbers separated by a comma. You need to split(",") it.jwvh
Thanks for your reply.@jwvh But after using split function as below : ` var trainingData = ssc.textFileStream(" directory path").map(x=>x.split(',').map(_.toDouble)).map(x=>Vectors.dense(x)) ` It gives this error: java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:212)priti baheti
I'm only guessing but I rather doubt that you are passing the correct type to the Vectors.dense() method. You might try: .map(_.split(',').map(x=>Vectors.dense(x.toDouble)))jwvh
@jwvh I did the way you suggested. But it results in a Dstream vector of type DStream[Array[Vector]] which is not accepted by trainOn method of Streaming K-means. It needs Stream data of type DStream[Vector]priti baheti
Did you get a proper result in predictOnValues?Suzy Tros

1 Answers

1
votes

There was a dimension issue. The dimension of the vector and numDimension passed to streaming K-means model should be the same.