2
votes

I've written a simple MapReduce example in Python. If the input was a file, for example text file, for running the code we simply use the following pattern: cat <data> | map | sort | reduce, for example in my case it is: cat data | ./mapper.py | sort | ./reducer.py and all things work right.

But I changed my mapper and reducer to read data from a directory that contains .gz files. So I should pass the path of the directory as input. I test the following terminal command cat dat/ | ./mapper.py | sort | ./reducer.py while the directory containing data is dat/, but I faced error:

cat: dat/: Is a directory
Traceback (most recent call last):
  File "./mapper.py", line 9, in <module>
    for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'

How can I pass a directory as input to Mapreduce in Python?

The following is my code:

mapper.py

#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip

QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
    f = gzip.open(filename, 'r')
    for line in f:
        val = line.strip()
        (year, temp, q) = (val[15:19], val[87:92], val[92:93])
        if temp != MISSING and q in QUALITY:
            print " %s\t%s" % (year, temp)

reducer.py

#!/usr/bin/env python
import sys

max_val = -sys.maxint
key = ''
for line in sys.stdin:
    (key, val) = line.strip().split('\t')
    max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
2
zcat data/*.gz | ./mapper.py | sort | ./reducer.py philantrovert
@philantrovert Thanks, Notice to my mapper, I suppose the input is the directory address, containing .gz files and I use for loop to read them all as I've done before but not in MapReduce model. But I think your suggestion pass the exact address of all .gz files in the directory. Am I right?Soheil Pourbafrani
zcat (gzip + cat) extracts the .gz-file and passes its content to your mapper. Maybe, this would have worked for your .gz-files without changing your mapper.Chickenmarkus

2 Answers

1
votes

The line for filename in glob.glob(sys.stdin + '*.gz'): expects a string from the stdin. Hence, simply pass a string (echo) instead of file contents (cat):

$ echo dat/ | ./mapper.py | sort | ./reducer.py

However, why do you pass arguments via pipes? Usually arguments are directly passed and read by python via sys.argv (or even better via an interpreter such as "argparse").

0
votes

To get path of current working directory use:

import os
path = os.getcwd()

You can get all the files from this file:

filenames = os.listdir(path)
# filter files that doesn't have .gz filetype
filenames = [file_name for file_name in filenames if file_name.endswith('.gz')]

You can simply iterate over the files with:

for filename in filenames:
    f = gzip.open(path+filename, 'r')