0
votes

I am trying to join two JavaPairRDD using full outer join. And I want to incorporate filter (like where clause in sql) and also select only one rdd (either left side rdd or right rdd based on some condition). I have tried doing a filter function on the joined result rdd, but it does not seem to support transformation like function to select only one rdd. With mapToPair, it does not allow me filter. Should I try doing a filter and then map (or vice versa), doing two pass on the data. I would have thought of a direct full outer join function support to expose filter and map together.

JavaPairRDD<String, Tuple2<Optional<MyData>, Optional<MyDate>>> bagrp = agrp.fullOuterJoin(agrp);
JavaPairRDD<String, MyData> outmap = fgrp.mapToPair(new PairFunction <Tuple2<String, Tuple2<Optional<MyData>, Optional<MyData>>>,  String, MyData>() 
{
    @Override
    public Tuple2<String, MyData> call(Tuple2<String, Tuple2<Optional<MyData>, Optional<MyData>>> arg0) throws Exception 
        {
            if ( based on some condition ) return new Tuple2<String, MyData>(obj1,obj2);
            else return null;
        }
}

Returning null in mapToPair is still present in the returned RDD. Is there a way to avoid, without doing an explicit filter?

Thanks Srivatsan

2

2 Answers

0
votes

You can instead use flatMapToPair and return an empty iterator when you don't want anything included and an iterator with a single element when you do want to produce an element.

0
votes

As a rule of thumb it's better to project and filter earlier, so that less data is shuffled around the cluster. I'd recommend to follow that and filter first, then do the join. There will not be two passes over the data, it'll do filter in place.

More specific answer depends on details of your use case. In case your RDDs are already partitioned with the same partitioner (repartition method), it's fine to do a join, then filter, because same keys of both RDDs are already on the same nodes.

After that you do mapToPair. It's not clear from the question if you change keys of the pairs. If you do not, then it's much better to use mapValues method as it preserves partitioning, which might be useful for the next step.