I am learning Hadoop. I have 2 Mappers both processing different files and 1 Reducer combining inputs from both Mappers.
Input: File 1:
1,Abc
2,Mno
3,Xyz
File 2:
1,CS
2,EE
3,CS
Expected output:
1 1,Abc,CS
2 2,Mno,EE
3 3,Xyz,CS
Getting Output:
1 1,,CS
2 2,Mno,
3 3,Xyz,
My Code:
Mapper 1:
public class NameMapper extends MapReduceBase implements
Mapper<LongWritable, Text, LongWritable, UserWritable> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<LongWritable, UserWritable> output, Reporter reporter)
throws IOException {
String val[] = value.toString().split(",");
LongWritable id = new LongWritable(Long.parseLong(val[0]));
Text name = new Text(val[1]);
output.collect(id, new UserWritable(id, name, new Text("")));
}
}
Mapper2:
public class DepartmentMapper extends MapReduceBase implements
Mapper<LongWritable, Text, LongWritable, UserWritable> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<LongWritable, UserWritable> output, Reporter reporter)
throws IOException {
String val[] = value.toString().split(",");
LongWritable id = new LongWritable(Integer.parseInt(val[0]));
Text department = new Text(val[1]);
output.collect(id, new UserWritable(id, new Text(""), department));
}
}
Reducer:
public class JoinReducer extends MapReduceBase implements
Reducer<LongWritable, UserWritable, LongWritable, UserWritable> {
@Override
public void reduce(LongWritable key, Iterator<UserWritable> values,
OutputCollector<LongWritable, UserWritable> output,
Reporter reporter) throws IOException {
UserWritable user = new UserWritable();
while (values.hasNext()) {
UserWritable u = values.next();
user.setId(u.getId());
if (!(u.getName().equals(""))) {
user.setName(u.getName());
}
if (!(u.getDepartment().equals(""))) {
user.setDepartment(u.getDepartment());
}
}
output.collect(user.getId(), user);
}
}
Driver:
public class Driver extends Configured implements Tool {
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), Driver.class);
conf.setJobName("File Join");
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(UserWritable.class);
conf.setReducerClass(JoinReducer.class);
MultipleInputs.addInputPath(conf, new Path("/user/hadoop/join/f1"),
TextInputFormat.class, NameMapper.class);
MultipleInputs.addInputPath(conf, new Path("/user/hadoop/join/f2"),
TextInputFormat.class, DepartmentMapper.class);
Path output = new Path("/user/hadoop/join/output");
FileSystem.get(new URI(output.toString()), conf).delete(output);
FileOutputFormat.setOutputPath(conf, output);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(result);
}
}
UserWritable:
public class UserWritable implements Writable {
private LongWritable id;
private Text name;
private Text department;
public UserWritable() {
}
public UserWritable(LongWritable id, Text name, Text department) {
super();
this.id = id;
this.name = name;
this.department = department;
}
public LongWritable getId() {
return id;
}
public void setId(LongWritable id) {
this.id = id;
}
public Text getName() {
return name;
}
public void setName(Text name) {
this.name = name;
}
public Text getDepartment() {
return department;
}
public void setDepartment(Text department) {
this.department = department;
}
@Override
public void readFields(DataInput in) throws IOException {
id = new LongWritable(in.readLong());
name = new Text(in.readUTF());
department = new Text(in.readUTF());
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(id.get());
out.writeUTF(name.toString());
out.writeUTF(department.toString());
}
@Override
public String toString() {
return id.get() + "," + name.toString() + "," + department.toString();
}
}
Reducer should get 2 UserWritable objects for every UserId; 1st having id,name and 2nd having id,department. Can anyone explain where I made mistake?