i am trying to use filter() inside map() but i get this spark exception:
RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
i know that spark doesn't allow nested transformations/actions/RDDs, so can any one give me a suggestion how to do it alternatively (without nested transformations or actions), well i have an RDD its tuples are like:
JavaRDD< String[]> RDD
i try to map it, giving it a list as an argument, this list contains javaPairRDDs such:
List<JavaPairRDD<String,String>> list
JavaRDD< String[]> result = RDD.map(new modifyRDD(list));
these lines refers to modifyRDD() function:
public static class modifyRDD implements Function <String[], String[]> {
List<JavaPairRDD<String,String>> list;
public modifyRDD (List<JavaPairRDD<String,String>> list ){ this.list=list;}
public String [] call(String[] t) {
String[] s = t;
for (int i = 0; i < NB_TD; i++) {
int j=i;
// select the appropriate RDD from the RDDs_list to the current index
JavaPairRDD<String,String> rdd_i = list.get(i);
String previousElement=s[j];
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
String newElement=currentRDD.first()._2();
s[j]=newElement;
}
return (s) ;
}
}
so, the problem is in this line
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
now i give an example,suppose that list contains 2 PairRDDs
list={PairRDD1={(a,b)(c,d)},PairRDD2={(u,v)(x,y)}..}
and my RDD that i want to map contains:
JavaRDD< String[]> RDD = {[a,u],[c,x],[a,x].....}
the result that i want is after map():
JavaRDD< String[]> result = {[b,v],[d,y],[b,y].....}