1
votes

I'm using Spark 1.1.0.

I have 2 RDDs firstSample and secondSample of type JavaRDD<IndividualBean>. The contents of these RDDs is as follows:

[
IndividualBean [params={...}], 
IndividualBean [params={...}], 
IndividualBean [params={...}]
]

[
IndividualBean [params={...}], 
IndividualBean [params={...}], 
IndividualBean [params={...}]
]

When I try to zip them together, I get the following error:

Can only zip RDDs with same number of elements in each partition

I guess that this is because my RDDs do not have the same number of partitions, or the same number of elements per partition.

I would like to perform an operation on these RDDs that will give me the same results as zip.

Right now, I found the following solution (with the totalSize variable just being the size of firstSample.union(secondSample)):

JavaPairRDD<IndividualBean, IndividualBean> zipped = firstSample.union(secondSample).zipWithIndex().mapToPair(
            new PairFunction<Tuple2<IndividualBean,Long>, Long, IndividualBean>() {
                @Override
                public Tuple2<Long, IndividualBean> call(
                        Tuple2<IndividualBean, Long> tuple) throws Exception {
                    return new Tuple2<Long, IndividualBean>(tuple._2, tuple._1);
                }
    }).groupBy(new Function<Tuple2<Long,IndividualBean>, Long>() {
        @Override
        public Long call(Tuple2<Long, IndividualBean> tuple) throws Exception {
            long index = tuple._1.longValue();
            if(index < totalSize/2){
                return index+totalSize/2;
            }
            return index;
        }
    }).values().mapToPair(new PairFunction<Iterable<Tuple2<Long, IndividualBean>>, IndividualBean, IndividualBean>() {
        @Override
        public Tuple2<IndividualBean, IndividualBean> call(
                Iterable<Tuple2<Long, IndividualBean>> iterable) throws Exception {
            Iterator<Tuple2<Long, IndividualBean>> it = iterable.iterator();
            IndividualBean firstBean = it.next()._2;
            IndividualBean secondBean = it.next()._2;
            return new Tuple2<IndividualBean, IndividualBean>(firstBean, secondBean);
        }
    });

But it is quite expensive, since it involves shuffling.

What could be a better way to do this?

1
I think you need to step back and think about why these RDDs are partitioned differently. Is it by accident or because you need them to be? As long as they're partitioned differently ANY method of zipping them is going to require a lot of communication and hence will be expensive. It may make sense to make sure they're identically partitioned from the start, or to re-partition one or both of them at some point, depending on your application. If you give us more background it'll be a lot easier to help you. - Spiro Michaylov
Well, basically, I have a population a IndividualBean. Then I randomly take (using sample) at most half the subset of this population (subset A), and compare each individual in this subset A, to another individual contained in the remaining subset (I make sure this remaining subset B is of the same size as subset A). - julien
This process of comparing individuals is part of a selection process in a genetic algorithm. The zip operation allows me to do this comparison, but since I randomly choose individuals to be compared, I always end up with non compatible(# number of partitions,# number of elements/partition) subsets involved. - julien
Are you willing to accept the constraint that each of the pairs takes both elements from the same partition? Then it should be possible to create your zip (using one of the overloads of zipPartitions) without any communication whatsoever. Subsets A and B would probably be computed using mapPartitions and some trickery. - Spiro Michaylov
Thanks a lot, how can I obtain a JavaPairRDD<IndividualBean, IndividualBean> at the end using mapPartitions and zipPartitions? could you show me how I can code this solution? - julien

1 Answers

0
votes

A solution in Scala because that's how I've dome all my Spark programming.

The key to this solution is to keep the same partitioning scheme throughout, and then zip the individual partitions together. To achieve this the solution plays fast and loose with sampling. In particular, the data point paired with each randomly chosen point is:

  1. Chosen from the same partition
  2. Not chosen at random (in fact will tend to have come from right next to it in the original RDD)

The first of these simplifications is essential to the solution. The second could be removed by adding some code to the zipFunc defined below to reorder one side of the zip.

It's important to understand what zipFunc does: I'm zipping together the sample and its complement, and these are not even the same size. I trivially zip up the contents of corresponding partitions in the two RDDs and even though they don't have the same number of samples: when I run out of samples on one side of the zip I just drop the ones on the other side.

val testRDD = sc.parallelize(1 to 1000, 4)

val firstSample = testRDD.sample(false, 0.4)
val remaining = testRDD.subtract(firstSample)

def zipFunc(l: Iterator[Int], r: Iterator[Int]) : Iterator[(Int,Int)] = {
  val res = new ListBuffer[(Int, Int)]
  // exercise for the reader: suck either l or r into a container before iterating 
  // and consume it in random order to achieve more random pairing if desired
  while (l.hasNext && r.hasNext) {
    res += ((l.next(), r.next()))
  }
  res.iterator
}
// notice the `true` to make sure partitioning is preserved
val pairs:RDD[(Int,Int)] = firstSample.zipPartitions(remaining, true)(zipFunc)

As far as I can tell this requires no cross-partition communication. It does depend on your samples being drawn fairly evenly from across the partitions, and in my experience the sample() method is not too bad in this regard.