4
votes

I am new to Scala/Spark. I would like to convert List of values into Separate rows ONLY USING RDD ( no dataframes). Appreciate if any one can help me with this.

Input :

List( ("A",List(10643, 10692)), ("B",List(10308)),("C",List(1000,2000)) )

Expected Output:

A 10643
A 10692
B 10308
C 1000
C 2000

I am able to do it separately, but not together.

This is what I tried

val Input = sc.makeRDD(List( ("A",List(10643, 10692)), ("B",List(10308)),("C",List(1000,2000))  ))

Input.map(value=>value._1).collect().foreach(println)<
A
B
C

Input.map(value=>value._2).flatMap(x=>x).collect().foreach(println)<br>
10643
10692
10308
1000
2000
4

4 Answers

3
votes

Using function explode to Creates a new row for each element in the given array or map column.

import org.apache.spark.sql.functions._

val data = List( ("A",List(10643, 10692)), ("B",List(10308)),("C",List(1000,2000)) )
val rdd = sc.parallelize(data) 
val df = rdd.toDF("name", "list")

val exploded = df.withColumn("value", explode($"list")).drop("list")
exploded.show

If really prefer using RDD

val flatted = rdd.flatMap(r => r._2.map((r._1, _)))
2
votes

Here's a Spark-agnostic solution:

val list = List(("A", List(10643, 10692)), ("B", List(10308)), ("C", List(1000, 2000)))

val result = list.flatMap { 
  case (key, value) => value.map(v => (key, v))
}

result.foreach(println)

// (A,10643)
// (A,10692)
// (B,10308)
// (C,1000)
// (C,2000)
0
votes

Two solutions (second is more scale-able):

Processing on Driver then conversion to RDD

val in = List( ("A",List(10643, 10692)), ("B",List(10308)),("C",List(1000,2000)) )

val out = sc.parallelize(in.flatMap{case (k, l) => l.map(v => (k,v))})

out.take(10).foreach(println)

Conversion to RDD then processing on executors

val in = List( ("A",List(10643, 10692)), ("B",List(10308)),("C",List(1000,2000)) )

val inRDD = sc.parallelize(in)

val out = inRDD.flatMap{case (k, l) => l.map(v => (k,v))}
0
votes

Using PairRDDFunctions

import scala.collection.mutable._
val df = List( ("A",List(10643, 10692)), ("B",List(10308)),("C",List(1000,2000)) ).toDF("name","list")
val rdd1 = df.rdd.map( x=>  (x(0), x(1)))
val pair = new PairRDDFunctions(rdd1)
pair.flatMapValues(_.asInstanceOf[mutable.WrappedArray[Int]]).foreach(println)

Results:

(A,10643)
(B,10308)
(A,10692)
(C,1000)
(C,2000)