1
votes

I'm currently exploring bloom filter. I've gone through most of the blogs on bloom fitlers and know what is but still not able to figure out an example on in case joins.

Every article says that it'll reduce the network I/O but none of them show how? Particular one was good http://vanjakom.wordpress.com/tag/distributed-cache/ but it seemed as complex one as I've just started out with map reduce.

Can any one help me in implementing bloom filter in the below example(reduceside join)

2 mapers to read user records and department records and reducer to join

User Records

id, name

3738, Richie Gore

12946,Rony Sam

17556,David Gart

3443,Rachel Smith

5799,Paul Rosta

Department records

3738,Sales

12946,Marketing

17556,Marketing

3738,Sales

3738,Sales

Code

public class UserMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{

 private Text outkey = new Text();
 private Text outval = new Text();
 private String id, name;

public void map (LongWritable key, Text value, OutputCollector<Text, Text> ouput,Reporter reporter)
             throws IOException {

     String line = value.toString();
     String arryUsers[] = line.split(",");
     id = arryUsers[0].trim();
     name = arryUsers[1].trim();

     outkey.set(id);
     outval.set("A"+ name);
     ouput.collect(outkey, outval);
   }
    }

public class DepartMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

private Text Outk = new Text();
private Text Outv = new Text();
String depid, dep ;

public void map (LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

    String line = value.toString();
    String arryDept[] = line.split(",");
    depid = arryDept[0].trim();
    dep = arryDept[1].trim();

    Outk.set(depid);
    Outv.set("B" + dep);

    output.collect(Outk, Outv);
}
    }

and Reducer

ublic class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

private Text tmp = new Text();
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();

public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException {

    listA.clear();
    listB.clear();

    while (values.hasNext()) {

        tmp = values.next();
        if (tmp.charAt(0) == 'A') {
            listA.add(new Text(tmp.toString().substring(1)));
        } else if (tmp.charAt(0) == 'B') {
            listB.add(new Text(tmp.toString().substring(1)));
        }



    }
    executejoinlogic(output);

}

private void executejoinlogic(OutputCollector<Text, Text> output) throws IOException {

    if (!listA.isEmpty() && !listB.isEmpty()) {
        for (Text A : listA) {
        for (Text B : listB) {
        output.collect(A, B);
        }
        }
         }
    }
          }

Is it possible to implement bloom filter in above scenario?

if yes then please help me to implement this?

1

1 Answers

0
votes

You can implement a bloom filter here only if one of your two input tables is much smaller than the other. The process you will need to follow here is:

  1. Initialise a bloom filter in the setup() method of the Mapper class (the filter object itself should be global so that it can be accessed by the map() method later):

    filter = new BloomFilter(VECTOR_SIZE,NB_HASH,HASH_TYPE);

  2. Read the smaller table into the setup() method of the Mapper.

  3. Add the ID of each record to a bloom filter:

    filter.add(ID);

  4. In the map() method itself, use filter.membershipTest(ID) on any IDs from the larger input source. If there is no match, you know that the ID isn't present in your smaller dataset, and so shouldn't be passed to the reducer.

  5. Remember that you will get false positives in the reducer, so don't assume everything will be joined.