0
votes

I have the following simple code in Java. This code creates and fills the Map with 0 values.

Map<Integer,Integer> myMap = new HashMap<Integer,Integer>();
for (int i=0; i<=20; i++) { myMap.put(i, 0); }

I want to create a similar RDD using Spark and Scala. I tried this approach, but it returns me RDD[(Any) => (Any,Int)] instead of RDD[Map(Int,Int)]. What am I doing wrong?

val data = (0 to 20).map(_ => (_,0))
val myMapRDD = sparkContext.parallelize(data)
3

3 Answers

2
votes

In Scala, (0 to 20).map(_ => (_, 0)) would not compile, as it has invalid placeholder syntax. I believe you might be looking for something like below instead:

val data = (0 to 20).map( _->0 )

which would generate a list of key-value pairs, and is really just a placeholder shorthand for:

val data = (0 to 20).map( n => n->0 )

// data: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector(
//   (0,0), (1,0), (2,0), (3,0), (4,0), (5,0), (6,0), (7,0), (8,0), (9,0), (10,0),
//   (11,0), (12,0), (13,0), (14,0), (15,0), (16,0), (17,0), (18,0), (19,0), (20,0)
// )

A RDD is an immtable collection (e.g. Seq, Array) of data. To create a RDD of Map[Int,Int], you would expand data inside a Map which in turn gets placed inside a Seq collection:

val rdd = sc.parallelize(Seq(Map(data: _*)))

rdd.collect
// res1: Array[scala.collection.immutable.Map[Int,Int]] = Array(
//   Map(0 -> 0, 5 -> 0, 10 -> 0, 14 -> 0, 20 -> 0, 1 -> 0, 6 -> 0, 9 -> 0, 13 -> 0, 2 -> 0, 17 -> 0,
//       12 -> 0, 7 -> 0, 3 -> 0, 18 -> 0, 16 -> 0, 11 -> 0, 8 -> 0, 19 -> 0, 4 -> 0, 15 -> 0)
// )

Note that, as is, this RDD consists of only a single Map, and certainly you can assemble as many Maps as you wish in a RDD.

val rdd2 = sc.parallelize(Seq(
  Map((0 to 4).map( _->0 ): _*),
  Map((5 to 9).map( _->0 ): _*),
  Map((10 to 14).map( _->0 ): _*),
  Map((15 to 19).map( _->0 ): _*)
))
2
votes

What you are creating are tuples. Instead you need to create Map and parallelize as below

val data = (0 to 20).map(x => Map(x -> 0))         //data: scala.collection.immutable.IndexedSeq[scala.collection.immutable.Map[Int,Int]] = Vector(Map(0 -> 0), Map(1 -> 0), Map(2 -> 0), Map(3 -> 0), Map(4 -> 0), Map(5 -> 0), Map(6 -> 0), Map(7 -> 0), Map(8 -> 0), Map(9 -> 0), Map(10 -> 0), Map(11 -> 0), Map(12 -> 0), Map(13 -> 0), Map(14 -> 0), Map(15 -> 0), Map(16 -> 0), Map(17 -> 0), Map(18 -> 0), Map(19 -> 0), Map(20 -> 0))

val myMapRDD = sparkContext.parallelize(data)       //myMapRDD: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]] = ParallelCollectionRDD[0] at parallelize at test.sc:19
1
votes

You can't parallelize a Map, as parallelize takes a Seq. What you can achieve is creating an RDD[(Int, Int)], which however does not enforce the uniqueness of keys. To perform operation by key, you can leverage PairRDDFunctions, that despite this limitation can end up being useful for your use case.

Let's try at least to get an RDD[(Int, Int)].

You used a slightly "wrong" syntax when applying the map to your range.

The _ placeholder can have different meanings depending on the context. The two meanings that got mixed up in your snippet of code are:

  • a placeholder for an anonymous function parameter that is not going to be used (as in (_ => 42), a function which ignores its input and always returns 42)
  • a positional placeholder for arguments in anonymous functions (as in (_, 42) a function that takes one argument and returns a tuple where the first element is the input and the second is the number 42)

The above examples are simplified and do not account for type inference as they only wish to point out two of the meanings of the _ placeholder that got mixed up in your snippet of code.

The first step is to use one of the two following functions to create the pairs that are going to be part of the map, either

a => (a, 0)

or

(_, 0)

and after parallelizing it you can get the RDD[(Int, Int)], as follows:

val pairRdd = sc.parallelize((0 to 20).map((_, 0)))

I believe it's worth noting here, that mapping on the local collection is going to be executed eagerly and bound to your driver's resources, while you can obtain the same final result by parallelizing the collection first and then mapping the pair-creating function on the RDD.

Now, as mentioned you don't have a distributed map, but rather a collection of key-value pairs where the key uniqueness is not enforced. But you can work pretty seamlessly with those values using PairRDDFunctions, which you obtain automatically by importing org.apache.spark.rdd.RDD.rddToPairRDDFunctions (or without having to do anything in the spark-shell as the import has already been done for your), which will decorate your RDD leveraging Scala's implicit conversions.

import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

pairRdd.mapValues(_ + 1).foreach(println)

will print the following

(10,1)
(11,1)
(12,1)
(13,1)
(14,1)
(15,1)
(16,1)
(17,1)
(18,1)
(19,1)
(20,1)
(0,1)
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)
(8,1)
(9,1)

You can learn more about working with key-value pairs with the RDD API on the official documentation.