0
votes

I have the following setup: The mapper outputs records with key type K1 and value type V1, K1 being WritableComparable. The combiner thus gets K1 and Iterable<V1> as its input. It then does an aggregation and outputs exactly one K1, V1 record. The reducer takes the input from the combiners, again being K1, Iterable<V1>. To my understanding, there must exist exactly one K1, Iterable<V1> pair for each individual K1 at the Reduce phase. The reducer then outputs exactly one K2, V2. K2 is WritableComparable again.

My problem now is: I get multiple K2, V2 in my output files, even in the same file! The compare methods of my key classes are correct, I double-checked it. What is going wrong here? Do I also have to implement equals and hashCode? I thought equality is carried-out via comparing and checking if the compare result is 0.

Or are there other things I forgot?

Here are the key implementations:

The writable the key inherits from:

package somepackage;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class SomeWritable implements Writable {

        private String _string1;
        private String _string2;

        public SomeWritable() {
                super();
        }

        public String getString1() {
                return _string1;
        }

        public void setString1(final String string1) {
                _string1 = string1;
        }

        public String getString2() {
                return _string2;
        }

        public void setString2(final String string2) {
                _string2 = string2;
        }

        @Override
        public void write(final DataOutput out) throws IOException {
                out.writeUTF(_string1);
                out.writeUTF(_string2);
        }

        @Override
        public void readFields(final DataInput in) throws IOException {
                _string1 = in.readUTF();
                _string2 = in.readUTF();
        }
}

The key I use:

package somepackage;

import static org.apache.commons.lang.ObjectUtils.compare;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class SomeKey extends SomeWritable implements
                WritableComparable<SomeKey> {

        private String _someOtherString;

        public String getSomeOtherString() {
                return _someOtherString;
        }

        public void setSomeOtherString(final String someOtherString) {
                _someOtherString = someOtherString;
        }

        @Override
        public void write(final DataOutput out) throws IOException {
                super.write(out);
                out.writeUTF(_someOtherString);
        }

        @Override
        public void readFields(final DataInput in) throws IOException {
                super.readFields(in);
                _someOtherString = in.readUTF();
        }

        @Override
        public int compareTo(final SomeKey o) {
                if (o == null) {
                        return 1;
                }
                if (o == this) {
                        return 0;
                }
                final int c1 = compare(_someOtherString, o._someOtherString);
                if (c1 != 0) {
                        return c1;
                }
                final int c2 = compare(getString1(), o.getString1());
                if (c2 != 0) {
                        return c2;
                }
                return compare(getString2(), o.getString2());
        }
}
1
show us your implementation of Key. - SMA
Unfortunately I am not allowed to, as it is company code. But the key extends another Writable which is not WritableComparable, something like public class K1 extends OtherK implements WritableComparable<K1>. Could this be the problem? - rabejens
mask the things, change variables/name/package etc. and paste without it we cant help. - SMA

1 Answers

0
votes

I solved the problem: to make sure the same key is always distributed to the same reducer, hashCode() for the key must be implemented based on the current values in the key. Even if they are mutable. With this in place, everything works.

One must then be extremely careful not using these types in sets or as keys in maps etc.