I'm currently exploring bloom filter. I've gone through most of the blogs on bloom fitlers and know what is but still not able to figure out an example on in case joins.
Every article says that it'll reduce the network I/O but none of them show how? Particular one was good http://vanjakom.wordpress.com/tag/distributed-cache/ but it seemed as complex one as I've just started out with map reduce.
Can any one help me in implementing bloom filter in the below example(reduceside join)
2 mapers to read user records and department records and reducer to join
User Records
id, name
3738, Richie Gore
12946,Rony Sam
17556,David Gart
3443,Rachel Smith
5799,Paul Rosta
Department records
3738,Sales
12946,Marketing
17556,Marketing
3738,Sales
3738,Sales
Code
public class UserMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{
private Text outkey = new Text();
private Text outval = new Text();
private String id, name;
public void map (LongWritable key, Text value, OutputCollector<Text, Text> ouput,Reporter reporter)
throws IOException {
String line = value.toString();
String arryUsers[] = line.split(",");
id = arryUsers[0].trim();
name = arryUsers[1].trim();
outkey.set(id);
outval.set("A"+ name);
ouput.collect(outkey, outval);
}
}
public class DepartMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text Outk = new Text();
private Text Outv = new Text();
String depid, dep ;
public void map (LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String arryDept[] = line.split(",");
depid = arryDept[0].trim();
dep = arryDept[1].trim();
Outk.set(depid);
Outv.set("B" + dep);
output.collect(Outk, Outv);
}
}
and Reducer
ublic class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private Text tmp = new Text();
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException {
listA.clear();
listB.clear();
while (values.hasNext()) {
tmp = values.next();
if (tmp.charAt(0) == 'A') {
listA.add(new Text(tmp.toString().substring(1)));
} else if (tmp.charAt(0) == 'B') {
listB.add(new Text(tmp.toString().substring(1)));
}
}
executejoinlogic(output);
}
private void executejoinlogic(OutputCollector<Text, Text> output) throws IOException {
if (!listA.isEmpty() && !listB.isEmpty()) {
for (Text A : listA) {
for (Text B : listB) {
output.collect(A, B);
}
}
}
}
}
Is it possible to implement bloom filter in above scenario?
if yes then please help me to implement this?