
I've written a simple MapReduce example in Python. If the input was a file, for example text file, for running the code we simply use the following pattern: cat <data> | map | sort | reduce, for example in my case it is: cat data | ./mapper.py | sort | ./reducer.py and all things work right.

But I changed my mapper and reducer to read data from a directory that contains .gz files. So I should pass the path of the directory as input. I test the following terminal command cat dat/ | ./mapper.py | sort | ./reducer.py while the directory containing data is dat/, but I faced error:

cat: dat/: Is a directory
Traceback (most recent call last):
  File "./mapper.py", line 9, in <module>
    for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'

How can I pass a directory as input to Mapreduce in Python?

The following is my code:


#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip

QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
    f = gzip.open(filename, 'r')
    for line in f:
        val = line.strip()
        (year, temp, q) = (val[15:19], val[87:92], val[92:93])
        if temp != MISSING and q in QUALITY:
            print " %s\t%s" % (year, temp)


#!/usr/bin/env python
import sys

max_val = -sys.maxint
key = ''
for line in sys.stdin:
    (key, val) = line.strip().split('\t')
    max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
zcat data/*.gz | ./mapper.py | sort | ./reducer.py philantrovert
@philantrovert Thanks, Notice to my mapper, I suppose the input is the directory address, containing .gz files and I use for loop to read them all as I've done before but not in MapReduce model. But I think your suggestion pass the exact address of all .gz files in the directory. Am I right?Soheil Pourbafrani
zcat (gzip + cat) extracts the .gz-file and passes its content to your mapper. Maybe, this would have worked for your .gz-files without changing your mapper.Chickenmarkus

2 Answers


The line for filename in glob.glob(sys.stdin + '*.gz'): expects a string from the stdin. Hence, simply pass a string (echo) instead of file contents (cat):

$ echo dat/ | ./mapper.py | sort | ./reducer.py

However, why do you pass arguments via pipes? Usually arguments are directly passed and read by python via sys.argv (or even better via an interpreter such as "argparse").


To get path of current working directory use:

import os
path = os.getcwd()

You can get all the files from this file:

filenames = os.listdir(path)
# filter files that doesn't have .gz filetype
filenames = [file_name for file_name in filenames if file_name.endswith('.gz')]

You can simply iterate over the files with:

for filename in filenames:
    f = gzip.open(path+filename, 'r')