1
votes

I have a situation where I need to process a very large text file with the following format:

ID \t time \t duration \t Description \t status

I want to utilize MapReduce to help me process this file. I understand that MapReduce works based on key,value pair. Mapper will output key and some value and MapReduce will ensure that all same key end up in 1 reducer.

What I want to end up in a reducer is the rows that have time is within 1 hour of each other. Then in reducer, I would like to access all other info as well such as ID, duration, status to do other things. So I guess the value to output is a list or something?

I had some Python code to process input data. mapper.py

#!/usr/bin/env python
import sys
import re
for line in sys.stdin:
   line=line.strip()
   portions=re.split(r'\t+',line)
   time=portions[1]
#output key,value by print to stdout for reducer.py to read in.

Please note that the time in my data set is already in POSIX-time format.

How could I output key,value pair in Mapper to do that?

I'm still very new to MapReduce/Hadoop and appreciate all the help. Thank you in advance!

1

1 Answers

1
votes

Here is a strategy:

  • from the Mapper: emit three copies of each record and use secondary sorting:

    ( (composite key), value) =

    • ((hour of message -one hour, precise time of current message), message)
    • ((hour of message, precise time of message), message)
    • ((hour of message + one hour, precise time of message), message)

Now: you need to standard secondary sorting:

  • setPartitioner to only the first half of the Key (hour of message)
  • setGroupingComparator to only the first half of the key (hour of message)
  • setSortingComparator to (hour of message, precise time of message)

In the reducer: each reducer group receives all messages within +/- 60 to 120 minutes of the precise time of the message. The reducer sees all of the "precise time of message" in sorted order. So you can keep a sliding window of all messages viewed within past 60 minutes within each reducer

NOTE The above assumes the data for 60 minutes of messages can fit within the memory of a single reducer task. If otherwise then you will need to resort to writing out the data to disk as part of the windowing function.

Update OP asked for further clarification of the windowing, so here we go.

Consider from the perspective of the Mapper-emitted keys: there are three of them for each input record. Now on the Reducer that means that each input record appears in three different groups. The reason for it is we need to consider both the lead and the lag records against each input record. So now we have each group with access to ALL input records that can possibly be within 60 minutes of the earliest record as well as within 60 minutes of the latest one. Since the records are grouped by the earliest second of each hour: that means -60 (min) to +120 (max) vs any record that belongs within the given hour-group.