2
votes

I am new to HBase. I am trying to save multiple versions in a cell in HBase but I am just getting the last saved value only. I tried the following two commands to retrieve multiple saved versions: get 'Dummy1','abc', {COLUMN=>'backward:first', VERSIONS=>12} and scan 'Dummy1', {VERSIONS=>12} Both returned the output as follows:

ROW                   COLUMN+CELL                                               
 abc                  column=backward:first, timestamp=1422722312845, value=rrb 

1 row(s) in 0.0150 seconds The input file is as follows:

abc xyz kkk
abc qwe asd
abc anf rrb

The code for Table creation in HBase is as follows:

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;

public class HBaseTableCreator {

  public static void main(String[] args) throws Exception {

      HBaseConfiguration conf = new HBaseConfiguration();
      conf.set("hbase.master","localhost:60000");

      HBaseAdmin hbase = new HBaseAdmin(conf);
      HTableDescriptor desc = new HTableDescriptor("Dummy");
      HColumnDescriptor meta = new HColumnDescriptor("backward".getBytes());
      meta.setMaxVersions(Integer.MAX_VALUE);
      HColumnDescriptor prefix = new HColumnDescriptor("forward".getBytes());
      prefix.setMaxVersions(Integer.MAX_VALUE);
      desc.addFamily(meta);
      desc.addFamily(prefix);
      hbase.createTable(desc);

 }

}

The code to Dump the data in HBase is as follows: Main Class: import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class TestMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
    {
        // TODO Auto-generated method stub
        Configuration conf=new Configuration();
        //HTable hTable = new HTable(conf, args[3]);  
        String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length!=2)
        {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job=new Job(conf,"HBase dummy dump");
        job.setJarByClass(TestMain.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class); 
        job.setMapperClass(TestMapper.class);
        TableMapReduceUtil.initTableReducerJob("Dummy", null, job);
        //job.setOutputKeyClass(NullWritable.class);
        //job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        //job.setOutputKeyClass(Text.class);
        //job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        //HFileOutputFormat.configureIncrementalLoad(job, hTable);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

Mapper Class:

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

public class TestMapper extends Mapper <LongWritable, Text, Text, Put>{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

        String line=value.toString();
        String[] l=line.split("\\s+");
        for(int i=1;i<l.length;i++)
        {
            Put HPut = new Put(l[0].getBytes());
            HPut.add("backward".getBytes(),"first".getBytes(),l[i].getBytes());
            context.write(new Text(l[0]),HPut);
        }
    }
}

Please tell me where I'm going wrong.

1

1 Answers

1
votes

Your problem is that your writes are getting batched automatically and they're flushed at the end of the job (when the table is closed), probably causing every put operation to have exactly the same timestamp, and they're basically overwriting themselves (writing a version with the same timestamp of another one overwrites that version instead of inserting a new one).

The first approach to solve the issue could be to provide the timestamp by yourself with Put HPut = new Put(l[0].getBytes(), System.currentTimeMillis()); but you will probably face the same issue because the operation is so fast that a lot of puts will have the same timestamp.

This is what I would do to overcome this:

1- Stop using TableMapReduceUtil.initTableReducerJob in favour of a custom reducer which handles the writes to the hbase table.

2- Modify the mapper to write to the context all the values of each row so they get grouped into an iterable and passed to the reducer (i.e: abc, xyz kkk qwe asd anf rrb)

3- Implement my own reducer to work somewhat like this pseudocode:

Define myHTable
setup() {
  Instantiate myHtable
  Disable myHtable autoflush to prevent puts from being automatically flushed
  Set myHtable write buffer to at least 2MB
}
reduce(rowkey, results) {
  baseTimestamp = current time in milliseconds
  Iterate results {
     Instantiate put with rowkey ++baseTimestamp
     Add result to put
     Send put to myHTable
  }
}
cleanup() {
  Flush commits for myHTable
  Close myHTable
}

That way, there will be always 1ms between each version, the only thing you've got to be careful is that if you got a huge number of versions and run the same job multiple times, the timestamps of the new job could overlap the timestamps of a previous one, if you expect less than 30k versions you shouldn't be worried about it because each job will be at least 30 seconds away from the next one...

Anyway, be warned that it's not recommended to have more than a hundred versions (http://hbase.apache.org/book.html#versions), if you need more it would be wiser to go for a tall approach (a compound rowkey containing the key+timestamp) with no versions at all.

Sorry for the odd formatting, it was the only way to have the pseudocode shown nicely.