7
votes

RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

As the error says, i'm trying to map(transformation) a JavaRDD object within the main map function, how is it possible with Apache Spark?

The main JavaPairRDD object (TextFile and Word are defined classes):

JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords = new...

and map function:

filesWithWords.map(textFileJavaRDDTuple2 -> textFileJavaRDDTuple2._2().map(word -> new Word(word.getText(), (long) textFileJavaRDDTuple2._1().getText().split(word.getText()).length)));

also i tried foreach instead map function, but not working. (And of course searched SPARK-5063)

3

3 Answers

9
votes

In the same way nested operations on RDDs are not supported, nested RDD types are not possible in Spark. RDDs are only defined at the driver where, in combination with their SparkContext they can schedule operations on the data they represent.

So, the root cause we need to address in this case is the datatype:

JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords

Which in Spark will have no possible valid use. Depending on the usecase, which is not further explained in the question, this type should become one of:

A collection of RDDs, with the text file they refer to:

Map<TextFile,RDD<Word>>

Or a collection of (textFile,Word) by text file:

JavaPairRDD<TextFile, Word>

Or a collection of words with their corresponding TextFile:

JavaPairRDD<TextFile, List<Word>>

Once the type is corrected, the issues with the nested RDD operations will be naturally solved.

3
votes

When I got to this exact same point in my learning curve for Spark (tried and failed to use nested RDDs) I switched to DataFrames and was able to accomplish the same thing using joins instead. Also, in general, DataFrames appear to be almost twice as fast as RDDs -- at least for the work I have been doing.

0
votes

@maasg Firstly i used JavaPairRDD< TextFile, JavaRDD < Word > >, and it didn't work as you and @David Griffin said, it's not possible yet. Models:

TextFile(String path, String text)

Word(String word, Integer count)

Now using JavaRDD < TextFile > and models have changed as:

TextFile(String path, String text, List< Word > wordList)

Word(String word, Integer count)

Finally,

List<Word> countDrafts = wordCount.map(v11 -> new Word(v11._1(), (long) 0)).collect();
JavaRDD<TextFile> ft = fileTexts.map(v11 -> new TextFile(v11._1(), v11._2(), countDrafts));
ft.foreach(textFile -> textFile.getWordList().forEach(word -> new  Word(word.getText(), getWordCountFromText(textFile.getText(), word.getText())))); 

getWordCountFromText() function counts word in text of TextFile object, but not using spark reduce method unfortunately, using classic way.

By the way, i will try DataFrames in next days, but i have short time to do this.

Thank you all.