1
votes

I am creating an inverted index of documents, where the output should contain a word (from the text file) followed by all the files it appeared in. Something like

[word1: file1.txt file2.txt] [word2: file2.txt file3.txt]

I have written the code but it throws me this error.

for k, v in iterator: TypeError: () takes exactly 2 arguments (1 given)

Code:

from pyspark import SparkContext    
sc = SparkContext("local", "app")

path = '/ebooks'
rdd = sc.wholeTextFiles(path)

output = rdd.flatMap(lambda (file,contents):contents.lower().split())\
            .map(lambda file,word: (word,file))\
            .reduceByKey(lambda a,b: a+b)
print output.take(10)

I cannot figure out a way to emit both key and value (word and the filename) in the map. How can i go about it?

In mapreduce, the (word, key) pair can be emitted (key is the filename) but how can this be done in spark?

2

2 Answers

2
votes

I haven't tested this on dummy data, but looking at your code, I think the following modification should work:

output = rdd.flatMap(lambda (file,contents):[(file, word) for word in contents.lower().split()])\
      .map(lambda (file, word): (word,[file]))\
      .reduceByKey(lambda a,b: a+b)
0
votes

cannot figure out a way to emit both key and value

Use flatMapValues:

rdd = sc.wholeTextFiles("README.md")

rdd.flatMapValues(lambda content: content.lower().split()).take(3)

# [('file:/spark/README.md', '#'),
#  ('file:/spark/README.md', 'apache'),
#  ('file:/spark/README.md', 'spark')]

with flatMap you can:

rdd.flatMap(
    lambda fc: ((fc[0], s) for s in fc[1].lower().split()))


# [('file:/spark/README.md', '#'),
#  ('file:/spark/README.md', 'apache'),
#  ('file:/spark/README.md', 'spark')]