6
votes

I'm looking to flatten an RDD of tuples (using a no-op map), but I'm getting a type error:

val fromTuples = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
val flattened = fromTuples.flatMap(x => x)
println(flattened.collect().toNiceString)

Gives

error: type mismatch;

found : (Int, String) required: TraversableOnce[?]

val flattened = fromMap.flatMap(x => x)

The equivalent list of Lists or Arrays work fine, e.g.:

val fromList = sc.parallelize(List(List(1, 2), List(3, 4)))
val flattened = fromList.flatMap(x => x)
println(flattened.collect().toNiceString)

Can Scala handle this? If not, why not?

5

5 Answers

11
votes

Tuples aren't collections. Unlike Python, where a tuple is essentially just an immutable list, a tuple in Scala is more like a class (or more like a Python namedtuple). You can't "flatten" a tuple, because it's a heterogeneous group of fields.

You can convert a tuple to something iterable by calling .productIterator on it, but what you get back is an Iterable[Any]. You can certainly flatten such a thing, but you've lost all compile-time type protection that way. (Most Scala programmers shudder at the thought of a collection of type Any.)

4
votes

There isn't a great way, but you can perserve a little type safety with this method:

val fromTuples = session.sparkContext.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(t => Seq(t._1, t._2))
println(flattened.collect().mkString)

The type of flatten will be an RDD of whatever the parent of all the types in the tuple. Which, yes, in this case is Any but if the list were: List(("1", "a"), ("2", "b")) it would preserve the String type.

1
votes
  val fromTuples = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
  val flattened = fromTuples.flatMap(x => Array(x))
  flattened.collect()

The reason for your error is

flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

1
votes

From Lyuben's comment, this actually can be done, sneakily:

sc.parallelize(List(("a", 1), ("c", 2), ("e", 4))).flatMap(_.productIterator).collect()

All honour to him. (Though as Brian notes, this will forego type safety.)

1
votes

As others have said, there isn't a great way to do this, especially with respect to type safety.

However if you just want to print out the RDD in a nice flat format you can just map the RDD and use mkString:

scala> val myRDD = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
myRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> myRDD.map{case (a,b) => s"$a,$b"}.collect.mkString(",")
res0: String = 1,a,2,b,3,c