I have the following setup: The mapper outputs records with key type K1 and value type V1, K1 being WritableComparable. The combiner thus gets K1 and Iterable<V1> as its input. It then does an aggregation and outputs exactly one K1, V1 record. The reducer takes the input from the combiners, again being K1, Iterable<V1>. To my understanding, there must exist exactly one K1, Iterable<V1> pair for each individual K1 at the Reduce phase. The reducer then outputs exactly one K2, V2. K2 is WritableComparable again.
My problem now is: I get multiple K2, V2 in my output files, even in the same file! The compare methods of my key classes are correct, I double-checked it. What is going wrong here? Do I also have to implement equals and hashCode? I thought equality is carried-out via comparing and checking if the compare result is 0.
Or are there other things I forgot?
Here are the key implementations:
The writable the key inherits from:
package somepackage;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class SomeWritable implements Writable {
private String _string1;
private String _string2;
public SomeWritable() {
super();
}
public String getString1() {
return _string1;
}
public void setString1(final String string1) {
_string1 = string1;
}
public String getString2() {
return _string2;
}
public void setString2(final String string2) {
_string2 = string2;
}
@Override
public void write(final DataOutput out) throws IOException {
out.writeUTF(_string1);
out.writeUTF(_string2);
}
@Override
public void readFields(final DataInput in) throws IOException {
_string1 = in.readUTF();
_string2 = in.readUTF();
}
}
The key I use:
package somepackage;
import static org.apache.commons.lang.ObjectUtils.compare;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class SomeKey extends SomeWritable implements
WritableComparable<SomeKey> {
private String _someOtherString;
public String getSomeOtherString() {
return _someOtherString;
}
public void setSomeOtherString(final String someOtherString) {
_someOtherString = someOtherString;
}
@Override
public void write(final DataOutput out) throws IOException {
super.write(out);
out.writeUTF(_someOtherString);
}
@Override
public void readFields(final DataInput in) throws IOException {
super.readFields(in);
_someOtherString = in.readUTF();
}
@Override
public int compareTo(final SomeKey o) {
if (o == null) {
return 1;
}
if (o == this) {
return 0;
}
final int c1 = compare(_someOtherString, o._someOtherString);
if (c1 != 0) {
return c1;
}
final int c2 = compare(getString1(), o.getString1());
if (c2 != 0) {
return c2;
}
return compare(getString2(), o.getString2());
}
}
Writablewhich is notWritableComparable, something likepublic class K1 extends OtherK implements WritableComparable<K1>. Could this be the problem? - rabejens