1
votes

I have the below table for which I am generating a sequence of numbers with rowId column in order to perform a join but this is throwing the following error. What am I doing wrong? Please help me with this.

fListVec: org.apache.spark.sql.DataFrame = [features: vector]
+-----------------------------------------------------------------------------+
|features                                                                     |
+-----------------------------------------------------------------------------+
|[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]|
|[0.9558040000000001,0.9843780000000002,0.545025,0.9979860000000002]          |
+-----------------------------------------------------------------------------+

Code:

import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val fListrdd = fListVec.rdd
    .map{case Row(features: Vector) => features}
    .zipWithIndex()
    .toDF("features","rowId")    

fListrdd.createOrReplaceTempView("featuresTable")
val f = spark.sql("SELECT features, rowId from featuresTable")
f.show(false)

Output:

import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 206.0 failed 1 times, most recent failure: Lost task 0.0 in stage 206.0 (TID 1718, localhost, executor driver): scala.MatchError: [[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.rdd.ZippedWithIndexRDD.(ZippedWithIndexRDD.scala:50) at org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.zipWithIndex(RDD.scala:1292) ... 101 elided Caused by: scala.MatchError: [[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) ... 3 more

Expected Output:

 features                 |       rowId

[2.5046410000000003,...]            0
[0.9558040000000001,...]            1
2

2 Answers

2
votes

You have to write a map function in the middle to define the dataType for the new dataframe to be created as

val fListrdd = fListVec.rdd
  .map{case Row(features) => features}
  .zipWithIndex()
  .map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt))
  .toDF("features","rowId")

where .map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt)) line is only added.

You can go one step further and create a dataset. I personally recommend dataset as datasets are type safety and optimized form of dataframes.

For that you would need a case class

case class features(features: DenseVector, rowId: Int)

and just add features word in my above solution, so that you can call .toDS api to create a type-safety dataset.

val fListDS = fListVec.rdd
  .map{case Row(features: DenseVector) => features}
  .zipWithIndex()
  .map(x => features(x._1.asInstanceOf[DenseVector], x._2.toInt))
  .toDS
1
votes

You're almost there - just need to specify the proper vector type DenseVector:

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.Row

val fList = Seq(
  (Seq(2.5046410000000003, 2.1487149999999997, 1.0884870000000002, 3.5877090000000003)),
  (Seq(0.9558040000000001, 0.9843780000000002, 0.545025, 0.9979860000000002))
).toDF("features")

def seqToVec = udf(
  (s: Seq[Double]) => new DenseVector(s.toArray)
)

val fListVec = fList.withColumn("features", seqToVec($"features"))
// fListVec: org.apache.spark.sql.DataFrame = [features: vector]

val fListrdd = fListVec.rdd.
  map{ case Row(features: DenseVector) => features }.
  zipWithIndex.
  toDF("features", "rowId")  

fListrdd.show
// +--------------------+-----+
// |            features|rowId|
// +--------------------+-----+
// |[2.50464100000000...|    0|
// |[0.95580400000000...|    1|
// +--------------------+-----+