0
votes

Here is my Reducer. Reducer takes in EdgeWritable and NullWritable

EdgeWritable has 4 integers, say <71, 74, 7, 2000> The communication is between 71(FromID) to 74(ToID) on 7(July) 2000(Year).

Mapper outputs 10787 records as such to reducer, But Reducer only outputs 1.

I need to output 44 files with for 44 months between the period Oct-1998 and July 2002. The output should be in format "out"+month+year. ForExample July 2002 records will be in file out72002.

I have debugged the code. After one iteration, it outputs one file and stops without taking next record. Please suggest How I should use MultipleOutput. Thanks

public class MultipleOutputReducer extends Reducer<EdgeWritable, NullWritable, IntWritable, IntWritable>{
private MultipleOutputs<IntWritable,IntWritable> multipleOutputs;

protected void setup(Context context) throws IOException, InterruptedException{
    multipleOutputs = new MultipleOutputs<IntWritable, IntWritable>(context);

}

@Override public void reduce(EdgeWritable key, Iterable val , Context context) throws IOException, InterruptedException { int year = key.get(3).get(); int month= key.get(2).get(); int to = key.get(1).get(); int from = key.get(0).get();

    //if(year >= 1997 && year <= 2001){
        if((month >= 9 && year >= 1997) || (month <= 6 && year <= 2001)){

            multipleOutputs.write(new IntWritable(from), new IntWritable(to), "out"+month+year );
        }
    //}

}
@Override
public void cleanup(Context context) throws IOException, InterruptedException{
    multipleOutputs.close();
}

Driver

public class TimeSlicingDriver extends Configured implements Tool{

static final SimpleDateFormat sdf = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
public int run(String[] args) throws Exception {


    if(args.length != 2){
        System.out.println("Enter <input path> <output path>");
        System.exit(-1);
    }

    Configuration setup = new Configuration();
    //setup.set("Input Path", args[0]);
    Job job = new Job(setup, "Time Slicing");
    //job.setJobName("Time Slicing");
    job.setJarByClass(TimeSlicingDriver.class);

    job.setMapperClass(TimeSlicingMapper.class);
    job.setReducerClass(MultipleOutputReducer.class);

    //MultipleOutputs.addNamedOutput(setup, "output", org.apache.hadoop.mapred.TextOutputFormat.class, EdgeWritable.class, NullWritable.class);


    job.setMapOutputKeyClass(EdgeWritable.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);

    /**Set the Input File Path and output file path*/
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));



    return job.waitForCompletion(true)?0:1;
}
1

1 Answers

0
votes

you are not iterating your Iterator "val", for that reason your logic in your code is executed one time for each group.