0
votes

I am trying to customize bulk load map-reduce into HBase, and I ran into issues with reducer. First I thought I didn't write the reducer well, but upon throwing runtime exception in reducer and seeing the code working, I realized that the reducer is not running at all. So far I don't see any thing wrong with some of common answers to this problem;

  1. My configuration has mapoutput and output separate
  2. My reducer and mapper has override
  3. I have Iterable, and my reducer input is (writable, put), so...

Here's my code:

Driver

public int run(String[] args) throws Exception {
    int result=0;
    String outputPath = args[1];
    Configuration configuration = getConf();
    configuration.set("data.seperator", DATA_SEPERATOR);
    configuration.set("hbase.table.name",TABLE_NAME);
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
    Job job = new Job(configuration);
    job.setJarByClass(HBaseBulkLoadDriver.class);
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapperClass(HBaseBulkLoadMapper.class);
    job.setReducerClass(HBaseBulkLoadReducer.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    FileInputFormat.addInputPaths(job, args[0]);
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.setMapOutputValueClass(Put.class);
    job.setNumReduceTasks(1);
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
    job.waitForCompletion(true);

Mapper

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    private String hbaseTable;
    private String dataSeperator;
    private String columnFamily1;
    private ImmutableBytesWritable hbaseTableName;

    public void setup(Context context) {
        Configuration configuration = context.getConfiguration();
        hbaseTable = configuration.get("hbase.table.name");
        dataSeperator = configuration.get("data.seperator");
        columnFamily1 = configuration.get("COLUMN_FAMILY_1");
        hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));
    }
        @Override
    public void map(LongWritable key, Text value, Context context) {
        try {
            String[] values = value.toString().split(dataSeperator);
            String rowKey = values[0];
            Put put = new Put(Bytes.toBytes(rowKey));
            BUNCH OF ADDS;
            context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put);
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }
}

Reducer

public class HBaseBulkLoadReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
      @Override
      protected void reduce(
          ImmutableBytesWritable row,
          Iterable<Put> puts,
          Reducer<ImmutableBytesWritable, Put,
                  ImmutableBytesWritable, Put>.Context context)
          throws java.io.IOException, InterruptedException
      {
        TreeMap<String,KeyValue> map = new TreeMap<String,KeyValue>();
        int count =0;
        Append nkv;
        byte[] tmp= "".getBytes();
        Put pp = new Put(tmp);
    try{
        for (Put p : puts) {
              byte[] r =  "".getBytes();
              //KeyValue kv = new KeyValue(r);
              if (count!=0){
              r = p.getRow();
              pp.add(new KeyValue(r));
              //KeyValue k = map.get(row.toString());
              //nkv = new Append(k.getRowArray());
              //nkv=nkv.add(kv);
              //map.put(row.toString(), k.clone());
              //context.write(row,nkv);
              //tmp=ArrayUtils.addAll(tmp,kv.getValueArray());
              //map.put(row.toString(),new KeyValue(kv.getRowArray(),kv.getFamilyArray(),kv.getQualifierArray(),tmp));
              count++;
              throw new RuntimeException();
              }
              else{
              r = p.getRow();
              pp = new Put(row.toString().getBytes());
              pp.add(new KeyValue(r));
              //tmp=kv.clone().getValueArray();
              //nkv = new Append(kv.getRowArray());
              //map.put(row.toString(), kv.clone());
              count++;
              throw new RuntimeException();
          }
     }
      context.write(row,pp);
      }catch(Exception e) { e.printStackTrace();}
     }

}

Well I know reducer is kinda messy but the thing is, it has runtimeException on both if and else clauses as you can see and the bulk load succeeds, so I am quite sure that the reducer is not running - and I am not sure why. All three files are maven packaged in same directory, FYI.

1
Why do you implicitly throw new RuntimeException();? - OneCricketeer
He was trying to see if the block even executed... "but upon throwing runtime exception in reducer and seeing the code working, I realized that the reducer is not running at all" - Tgsmith61591
I think the reducer should run based on job.setNumReduceTasks(1) but if Iterable<Put> puts is empty, then the for loop of the reducer won't be entered and those exceptions won't be thrown - OneCricketeer
Are you sure the map phase executes successfully? - Tgsmith61591
The only thing I can imagine is that they for block is never entered, meaning the Iterable<Put> puts could be empty. - Tgsmith61591

1 Answers

0
votes

Figured out what was wrong. configureincrementalload sets the reducer class to putsort or keyvaluesort according to output values, so if I want to use a custom reducer class I have to set it after configureincrementalload. After that I could see reducer running. Just answering my own question so it may help people who run into same problem.

HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
job.setReducerClass(HBaseBulkLoadReducer.class);
job.waitForCompletion(true);