1
votes

I want to scan a hbase table, and my codes are as follows.

public void start() throws IOException {
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);


Configuration hbaseConf = HBaseConfiguration.create();

Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("0001"));
scan.setStopRow(Bytes.toBytes("0004"));
scan.addFamily(Bytes.toBytes("DATA"));
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("TIME"));
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);   

String scanStr = Base64.encodeBytes(proto.toByteArray()); 

String tableName = "rdga_by_id";
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName);
hbaseConf.set(TableInputFormat.SCAN, scanStr); 

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(hbaseConf,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);


System.out.println("here: " + hBaseRDD.count());


PairFunction<Tuple2<ImmutableBytesWritable, Result>, Integer, Integer> pairFunc = 
        new PairFunction<Tuple2<ImmutableBytesWritable, Result>, Integer, Integer>() {
    @Override
    public Tuple2<Integer, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

        byte[] time = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("DATA"), Bytes.toBytes("TIME"));
        byte[] id = /* I want to get Row Key here */
        if (time != null && id != null) {
            return new Tuple2<Integer, Integer>(byteArrToInteger(id), byteArrToInteger(time));
        }
        else {
            return null;
        }
    }
};

Now I want to get the row key of each result. But I can only set family and column in the scan. How can I get the row key? Is there any function or method like result.getRowkey() that I can use with the JavaPairRDD? Or how should I set the Scan in order to keep row key in the result?

Thanks in advance!

1

1 Answers

1
votes

The result already contains your row. Actually your row key is the ImmutableBytesWritable. You only have to convert it into String again like:

String rowKey = new String(immutableBytesWritableResultTuple2._1.get());

I'm not sure which version of Spark you are using. In spark-core_2.10 with version 1.2.0 the "newAPIHadoopRDD" method returns no JavaPairRDD and a call would result in code like this:

RDD<Tuple2<ImmutableBytesWritable, Result>> hBaseRDD = sc.newAPIHadoopRDD(hbaseConf,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

However, the "hbaseRDD" then provides a function to convert it into a JavaRDD if necessary:

hBaseRDD.toJavaRDD();

Then you can use the ".mapToPair" method and use your defined function.