2
votes

i am trying to use filter() inside map() but i get this spark exception:

RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

i know that spark doesn't allow nested transformations/actions/RDDs, so can any one give me a suggestion how to do it alternatively (without nested transformations or actions), well i have an RDD its tuples are like:

 JavaRDD< String[]> RDD

i try to map it, giving it a list as an argument, this list contains javaPairRDDs such:

List<JavaPairRDD<String,String>> list
JavaRDD< String[]> result = RDD.map(new modifyRDD(list));

these lines refers to modifyRDD() function:

public static class modifyRDD implements Function <String[], String[]> { 

    List<JavaPairRDD<String,String>> list;
    public modifyRDD (List<JavaPairRDD<String,String>> list ){ this.list=list;}

    public String [] call(String[] t) {

          String[] s = t;

          for (int i = 0; i < NB_TD; i++) {         
            int j=i; 
         // select the appropriate RDD from the RDDs_list to the current index 

            JavaPairRDD<String,String> rdd_i = list.get(i);
            String previousElement=s[j];

           JavaPairRDD<String,String> currentRDD =  rdd_i.filter(line -> line._1().equals(previousElement));

           String newElement=currentRDD.first()._2();   

           s[j]=newElement;
                }

          return   (s) ;

    }


    }

so, the problem is in this line

  JavaPairRDD<String,String> currentRDD =  rdd_i.filter(line -> line._1().equals(previousElement));

now i give an example,suppose that list contains 2 PairRDDs

list={PairRDD1={(a,b)(c,d)},PairRDD2={(u,v)(x,y)}..}

and my RDD that i want to map contains:

 JavaRDD< String[]> RDD = {[a,u],[c,x],[a,x].....}

the result that i want is after map():

 JavaRDD< String[]> result = {[b,v],[d,y],[b,y].....}
1
Have you read error message and corresponding JIRA? Spark doesn't support nested actions and transformations the same way as it doesn't support nested RDDs. - zero323
yes i did, but what can i do as an alternative solution - ham
This problem has been covered so many time on SO... Short answer: a) RDD is small collect and use local variable or broadcast b) RDD is large express this problem as join - zero323
it is large in my case, would you give me steps that i can follow,please - ham
@zero323 if you have noticed i am using a list of pairRDDS inside the map function, bcz with every single index i need diffrent pairRDD, so how can i do join of my RDD with all of those pairRDDS( if you see the exapmle for eg, i have 2 pairRDDS, in my case i cant know how many pairRDDs are in the list) - ham

1 Answers

0
votes

i changed the type of the list from List> to List>> list to avoid dealing with RDDs inside the map(), now i have no exception (offcourse bcz i have no nested transformations), but what i am not sure about it is if my new code is efficient, bcz the List> is large, and for searching an element i used a loop "for" (means i have to sweep the whole List> to get the element that i want) so i ask you as experts to give me remarks about it (using the loop for), and give suggestion to ameliorate it. thank you

this is the map() function after modification

  public static class modifyRDD implements Function <String[], String[]> { 

    List<List<Tuple2<String,String>>> list;
    public modifyRDD (List<List<Tuple2<String,String>>> list ){ this.list=list;}

    public String [] call(String[] t) {

          String[] s = t;

          for (int i = 0; i < NB_TD; i++) {         

         // select the appropriate lookup_list 

            List<Tuple2<String,String>> list_i = list.get(i);
            String previousElement=s[i];
            String newElement="";

            for (int k = 0; k < list_i.size(); k++){

            Tuple2<String,String> sk1 = list_i.get(k);
            if (sk1._1.equals( previousElement)){  newElement=sk1._2;}

            }


           s[i]= newElement;
                }
         return(s);
                                   }