I'm using Spark 1.1.0.
I have 2 RDDs firstSample and secondSample of type JavaRDD<IndividualBean>. The contents of these RDDs is as follows:
[
IndividualBean [params={...}],
IndividualBean [params={...}],
IndividualBean [params={...}]
]
[
IndividualBean [params={...}],
IndividualBean [params={...}],
IndividualBean [params={...}]
]
When I try to zip them together, I get the following error:
Can only zip RDDs with same number of elements in each partition
I guess that this is because my RDDs do not have the same number of partitions, or the same number of elements per partition.
I would like to perform an operation on these RDDs that will give me the same results as zip.
Right now, I found the following solution (with the totalSize variable just being the size of firstSample.union(secondSample)):
JavaPairRDD<IndividualBean, IndividualBean> zipped = firstSample.union(secondSample).zipWithIndex().mapToPair(
new PairFunction<Tuple2<IndividualBean,Long>, Long, IndividualBean>() {
@Override
public Tuple2<Long, IndividualBean> call(
Tuple2<IndividualBean, Long> tuple) throws Exception {
return new Tuple2<Long, IndividualBean>(tuple._2, tuple._1);
}
}).groupBy(new Function<Tuple2<Long,IndividualBean>, Long>() {
@Override
public Long call(Tuple2<Long, IndividualBean> tuple) throws Exception {
long index = tuple._1.longValue();
if(index < totalSize/2){
return index+totalSize/2;
}
return index;
}
}).values().mapToPair(new PairFunction<Iterable<Tuple2<Long, IndividualBean>>, IndividualBean, IndividualBean>() {
@Override
public Tuple2<IndividualBean, IndividualBean> call(
Iterable<Tuple2<Long, IndividualBean>> iterable) throws Exception {
Iterator<Tuple2<Long, IndividualBean>> it = iterable.iterator();
IndividualBean firstBean = it.next()._2;
IndividualBean secondBean = it.next()._2;
return new Tuple2<IndividualBean, IndividualBean>(firstBean, secondBean);
}
});
But it is quite expensive, since it involves shuffling.
What could be a better way to do this?