I've written a simple MapReduce example in Python. If the input was a file, for example text
file, for running the code we simply use the following pattern: cat <data> | map | sort | reduce
, for example in my case it is: cat data | ./mapper.py | sort | ./reducer.py
and all things work right.
But I changed my mapper and reducer to read data from a directory
that contains .gz
files. So I should pass the path of the directory
as input. I test the following terminal command cat dat/ | ./mapper.py | sort | ./reducer.py
while the directory containing data is dat/
, but I faced error:
cat: dat/: Is a directory
Traceback (most recent call last):
File "./mapper.py", line 9, in <module>
for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'
How can I pass a directory as input to Mapreduce in Python?
The following is my code:
mapper.py
#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip
QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
f = gzip.open(filename, 'r')
for line in f:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if temp != MISSING and q in QUALITY:
print " %s\t%s" % (year, temp)
reducer.py
#!/usr/bin/env python
import sys
max_val = -sys.maxint
key = ''
for line in sys.stdin:
(key, val) = line.strip().split('\t')
max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
zcat data/*.gz | ./mapper.py | sort | ./reducer.py
– philantrovert.gz
files and I usefor loop
to read them all as I've done before but not in MapReduce model. But I think your suggestion pass the exact address of all.gz
files in the directory. Am I right? – Soheil Pourbafranizcat
(gzip + cat) extracts the .gz-file and passes its content to your mapper. Maybe, this would have worked for your .gz-files without changing your mapper. – Chickenmarkus