2
votes

I want to perform query operation in HBase to fetch records using provided list of row keys. Since Mappers in MapReduce work in parallel, so I want to use it.

Input List of row keys will be in the range of ~100000 and I have created a customInputFormat for mapper, that will give list of 1000 row keys to each mapper for querying HBase table. These queried records may or may not be present in HBase table, I want to return only those records that are present.

I have seen various examples, and what I found is that hbase table scan operation is performed to get range of rowkeys and range is specified by startingRowKey and endingRowKey, but I want to query for provided list of row keys only.

How can I do this with MapReduce? Any help is welcomed!

2
If you are okay. pls. flag-up "accepted by owner"Ram Ghadiyaram

2 Answers

1
votes

As you pass list of row keys to your mapper then you should make get requests to HBase. Every get returns data for the requested key or nothing if key doesn't exist.

First of all you should create Table instance in the setup() method of your mapper:

private Table table;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    Configuration hbaseConfig = HBaseConfiguration.create();
    Connection conn = ConnectionFactory.createConnection(hbaseConfig);
    this.table = conn.getTable(TableName.valueOf("hbaseTable"));
}

Then you can make get requests to HBase table from map() method per every key with the help of Get and Result instances:

String key = "keyString";
Get getValue = new Get(key.getBytes());

//add column family and column qualifier if you desire
getValue.addColumn("columnFamily".getBytes(), "columnQual".getBytes());

try {
    Result result = table.get(getValue);
    if (!table.exists(getValue)) {

        //requested key doesn't exist
        return;
    }

    // do what you want with result instance 
}

And after finish of mapper's work you need to close connection to the table in the cleanup() method;

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
    table.close();
}

Moreover you are free to pass results of get requests to the reducers or use cleanup() method to combine them. It depends on your purposes only.

2
votes

You can use this kind of methods in your mapper which worked well for me it will return array of Result.

/**
     * Method getDetailRecords.
     * 
     * @param listOfRowKeys List<String>
     * @return Result[]
     * @throws IOException
     */
    private Result[] getDetailRecords(final List<String> listOfRowKeys) throws IOException {
        final HTableInterface table = HBaseConnection.getHTable(TBL_DETAIL);
        final List<Get> listOFGets = new ArrayList<Get>();
        Result[] results = null;
        try {
            for (final String rowkey : listOfRowKeys) {// prepare batch of get with row keys
   // System.err.println("get 'yourtablename', '" + saltIndexPrefix + rowkey + "'");
                final Get get = new Get(Bytes.toBytes(saltedRowKey(rowkey)));
                get.addColumn(COLUMN_FAMILY, Bytes.toBytes(yourcolumnname));
                listOFGets.add(get);
            }
            results = table.get(listOFGets);

        } finally {
            table.close();
        }
        return results;
    }