2
votes

I have a file on HDFS in the form of:

61,139,75
63,140,77
64,129,82
68,128,56
71,140,47
73,141,38
75,128,59
64,129,61
64,129,80
64,129,99

I create an RDD from it and and zip the elements with their index:

val data = sc.textFile("hdfs://localhost:54310/usrp/sample.txt")
val points = data.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val indexed = points.zipWithIndex()
val indexedData = indexed.map{case (value,index) => (index,value)}

Now I need to create rdd1 with the index and the first two elements of each line. Then need to create rdd2 with the index and third element of each row. I am new to Scala, can you please help me with how to do this ?

This does not work since y is not of type Vector but org.apache.spark.mllib.linalg.Vector

val rdd1 = indexedData.map{case (x,y) => (x,y.take(2))}

Basically how to get he first two elements of such a vector ?

Thanks.

2

2 Answers

1
votes

You can make use of DenseVector's unapply method to get the underlying Array[Double] in your pattern-matching, and then call take/drop on the Array, re-wrapping it with a Vector:

val rdd1 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.take(2))) }
val rdd2 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.drop(2))) }

As you can see - this means the original DenseVector you created isn't really that useful, so if you're not going to use indexedData anywhere else, it might be better to create indexedData as a RDD[(Long, Array[Double])] in the first place:

val points = data.map(s => s.split(',').map(_.toDouble))
val indexedData: RDD[(Long, Array[Double])] = points.zipWithIndex().map(_.swap)

val rdd1 = indexedData.mapValues(arr => Vectors.dense(arr.take(2)))
val rdd2 = indexedData.mapValues(arr => Vectors.dense(arr.drop(2))) 

Last tip: you probably want to call .cache() on indexedData before scanning it twice to createrdd1 and rdd2 - otherwise the file will be loaded and parsed twice.

1
votes

You can achieve the above output by following the below steps:

Original Data:

indexedData.foreach(println)
(0,[61.0,139.0,75.0])
(1,[63.0,140.0,77.0])
(2,[64.0,129.0,82.0])
(3,[68.0,128.0,56.0])
(4,[71.0,140.0,47.0])
(5,[73.0,141.0,38.0])
(6,[75.0,128.0,59.0])
(7,[64.0,129.0,61.0])
(8,[64.0,129.0,80.0])
(9,[64.0,129.0,99.0])

RRD1 Data:

Having index along with first two elements of each line.

val rdd1 = indexedData.map{case (x,y) => (x, (y.toArray(0), y.toArray(1)))}
rdd1.foreach(println)
(0,(61.0,139.0))
(1,(63.0,140.0))
(2,(64.0,129.0))
(3,(68.0,128.0))
(4,(71.0,140.0))
(5,(73.0,141.0))
(6,(75.0,128.0))
(7,(64.0,129.0))
(8,(64.0,129.0))
(9,(64.0,129.0))

RRD2 Data:

Having index along with third element of row.

val rdd2 = indexedData.map{case (x,y) => (x, y.toArray(2))}
rdd2.foreach(println)
(0,75.0)
(1,77.0)
(2,82.0)
(3,56.0)
(4,47.0)
(5,38.0)
(6,59.0)
(7,61.0)
(8,80.0)
(9,99.0)