0
votes

I am trying construct distinction matrix using spark and am confused how to do it optimally. I am new to spark. I have given a small example of what I'm trying to do below.

Example of distinction matrix construction:

Given Dataset D:

+----+-----+------+-----+  
| id | a1  |  a2  | a3  |  
+----+-----+------+-----+  
|  1 | yes | high | on  |  
|  2 | no  | high | off |
|  3 | yes | low  | off |
+----+-----+------+-----+

and my distinction table is

+-------+----+----+----+
| id,id | a1 | a2 | a3 |
+-------+----+----+----+
| 1,2   |  1 |  0 |  1 |
| 1,3   |  0 |  1 |  1 |
| 2,3   |  1 |  1 |  0 |
+-------+----+----+----+

i.e whenever an attribute ai is helpful in distinguishing a pair of tuples, distinction table has a 1, otherwise a 0.

My Datasets are huge and I trying to do it in spark.Following are approaches that came to my mind:

  1. using nested for loop to iterate over all members of RDD (of dataset)
  2. using cartesian() transformation over original RDD and iterate over all members of resultant RDD to get distinction table.

My questions are:
In 1st approach, does spark automatically optimize nested for loop setup internally for parallel processing?

In 2nd approach, using cartesian() causes extra storage overhead to store intermediate RDD. Is there any way to avoid this storage overhead and get final distinction table?

Which of these approaches is better and is there any other approach which can be useful to construct distinction matrix efficiently (both space and time)?

1
By "is helpful in distinguish" do you mean "are different"? - Leonardo Herrera
yes, if attribute values are 'different' they help to distinguish a pair of tuples - Sasank Annavarapu
There is not such thing as "for loop" in Scala. But rather something called "for comprehension" which is just syntactic sugar for calls to map, flatMap & filter, which all of them are distributed on Spark. - For your second question, in general yes a cartesian is expensive. However, if you use a Dataset or a DataFrame instead of an RDD and after the join you perform some filter it will be optimized. - Luis Miguel Mejía Suárez

1 Answers

0
votes

For this dataframe:

scala> val df = List((1, "yes", "high", "on" ), (2,  "no", "high", "off"), (3, "yes",  "low", "off") ).toDF("id", "a1", "a2", "a3")
df: org.apache.spark.sql.DataFrame = [id: int, a1: string ... 2 more fields]

scala> df.show
+---+---+----+---+
| id| a1|  a2| a3|
+---+---+----+---+
|  1|yes|high| on|
|  2| no|high|off|
|  3|yes| low|off|
+---+---+----+---+

We can build a cartesian product by using crossJoin with itself. However, the column names will be ambiguous (I don't really know how to easily deal with that). To prepare for that, let's create a second dataframe:

scala> val df2 = df.toDF("id_2", "a1_2", "a2_2", "a3_2")
df2: org.apache.spark.sql.DataFrame = [id_2: int, a1_2: string ... 2 more fields]

scala> df2.show
+----+----+----+----+
|id_2|a1_2|a2_2|a3_2|
+----+----+----+----+
|   1| yes|high|  on|
|   2|  no|high| off|
|   3| yes| low| off|
+----+----+----+----+

In this example we can get combinations by filtering using id < id_2.

scala> val xp = df.crossJoin(df2)
xp: org.apache.spark.sql.DataFrame = [id: int, a1: string ... 6 more fields]

scala> xp.show
+---+---+----+---+----+----+----+----+
| id| a1|  a2| a3|id_2|a1_2|a2_2|a3_2|
+---+---+----+---+----+----+----+----+
|  1|yes|high| on|   1| yes|high|  on|
|  1|yes|high| on|   2|  no|high| off|
|  1|yes|high| on|   3| yes| low| off|
|  2| no|high|off|   1| yes|high|  on|
|  2| no|high|off|   2|  no|high| off|
|  2| no|high|off|   3| yes| low| off|
|  3|yes| low|off|   1| yes|high|  on|
|  3|yes| low|off|   2|  no|high| off|
|  3|yes| low|off|   3| yes| low| off|
+---+---+----+---+----+----+----+----+


scala> val filtered = xp.filter($"id" < $"id_2")
filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, a1: string ... 6 more fields]

scala> filtered.show
+---+---+----+---+----+----+----+----+
| id| a1|  a2| a3|id_2|a1_2|a2_2|a3_2|
+---+---+----+---+----+----+----+----+
|  1|yes|high| on|   2|  no|high| off|
|  1|yes|high| on|   3| yes| low| off|
|  2| no|high|off|   3| yes| low| off|
+---+---+----+---+----+----+----+----+

At this point the problem is basically solved. To get the final table we can use a when().otherwise() statement on each column pair, or a UDF as I have done here:

scala> val dist = udf((a:String, b: String) => if (a != b) 1 else 0)
dist: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, StringType)))

scala> val distinction = filtered.select($"id", $"id_2", dist($"a1", $"a1_2").as("a1"), dist($"a2", $"a2_2").as("a2"), dist($"a3", $"a3_2").as("a3"))
distinction: org.apache.spark.sql.DataFrame = [id: int, id_2: int ... 3 more fields]

scala> distinction.show
+---+----+---+---+---+
| id|id_2| a1| a2| a3|
+---+----+---+---+---+
|  1|   2|  1|  0|  1|
|  1|   3|  0|  1|  1|
|  2|   3|  1|  1|  0|
+---+----+---+---+---+