0
votes

I have an RDD of items, and a function d: (Item, Item) => Double that computes the distance between two items. I am trying to compute the average distance between items drawn at random from the RDD. The RDD is fairly large (100s of millions), so computing the exact average is out of the question.

Therefore I would like to get an RDD of sampled pairs of items (from which I would compute the distances). For example, I want to get a sample of 100m pairs. Given the RDD of sampled pairs, I would then compute the average, histogram etc. in order to understand the distance distribution.

Here are the initial attempts which have all failed:

  1. Generate two RDDs using .sample, zip them and compute the distance between items. This fails since .zip requires both RDDs to have the exact same number of items per partition.

  2. Use .cartesian of the RDD with itself, and then .sample. This fails (out of memory) since apparently cartesian is not meant to be used this way.

  3. collect two small samples of the RDD, and .zip the two arrays. This works fine but it doesn't scale.

Any ideas?

Thanks!


Edit: here is how to zip two samples with different number of items per partition:

val r = ... // RDD[Item]
val s1 = r.sample(true, 0.1, 123)
val s2 = r.sample(true, 0.1, 456)
val zipper = (i1: Iterator[Item], i2: Iterator[Item]) => i1.zip(i2)
val pairs = r1.zipPartitions(r2)(zipper) // zip the RDDs and explicitly define how to zip the partitions

The key is that while RDD's .zip method does not accept partitions with unequal sizes, the .zip method for iterators does (and discards the remaining part of the longer iterator).

1
"This fails (out of memory) since apparently cartesian is not meant to be used this way." Can you elaborate why you think it isn't meant to be used that way? - Yuval Itzchakov
Did you try something like rdd.cartesian(rdd).filter(lambda (a, b): a.key < b.key). ... you can reduce half of the data this way - Alberto Bonsanto
@YuvalItzchakov The issue with .cartesian is that spark attempts to create n^2 partitions, each with roughly m^2 elements (assuming there are initially n partitions and about m items per partitions). The thing is, that I have no need for that many partitions (or that many item pairs). Typical numbers are: 50 partitions, with 10M items per partition. It is not feasible to store even a single partition of the cartesian product RDD. I tried to sample it by a factor of 1e-7, but no success. I guess because spark actually creates the product partitions on the executors. - Amir
@AlbertoBonsanto Thanks for the comment. But my issue is not a factor of 2, but closer to a factor of 1e6... I have no need to the actual .cartesian RDD, but only to a tiny fraction of it. As mention above, .sample did not work. - Amir
@Amir Ok, that's what I wanted to know. From what you said I thought there was an issue with the API, but I understand the n ^ 2 problem where you don't really need that much samples. - Yuval Itzchakov

1 Answers

1
votes

Answering my own question:

  1. Obtain a sample of the rdd (with replacement),
  2. use .sliding(2) to obtain consecutive pairs of the sample.

Code:

import org.apache.spark.mllib.rdd.RDDFunctions._ // for .sliding
val x = ... // RDD[Item]
val xSize = x.count
val n = 1000000.0 // (approximate) desired sample size
val pairs = x.sample(true, n/xSize).sliding(2)
val distances = pairs.map(arr => dist(arr(0), arr(1)))