9
votes

Under the assumption that we could access data much faster if pulling directly from HDFS instead of using the HBase API, we're trying to build an RDD based on a Table Snapshot from HBase.

So, I have a snapshot called "dm_test_snap". I seem to be able to get most of the configuration stuff working, but my RDD is null (despite there being data in the Snapshot itself).

I'm having a hell of a time finding an example of anyone doing offline analysis of HBase snapshots with Spark, but I can't believe I'm alone in trying to get this working. Any help or suggestions are greatly appreciated.

Here is a snippet of my code:

object TestSnap  {
  def main(args: Array[String]) {
    val config = ConfigFactory.load()
    val hbaseRootDir =  config.getString("hbase.rootdir")
    val sparkConf = new SparkConf()
      .setAppName("testnsnap")
      .setMaster(config.getString("spark.app.master"))
      .setJars(SparkContext.jarOfObject(this))
      .set("spark.executor.memory", "2g")
      .set("spark.default.parallelism", "160")

    val sc = new SparkContext(sparkConf)

    println("Creating hbase configuration")
    val conf = HBaseConfiguration.create()

    conf.set("hbase.rootdir", hbaseRootDir)
    conf.set("hbase.zookeeper.quorum",  config.getString("hbase.zookeeper.quorum"))
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
    conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")

    val scan = new Scan
    val job = Job.getInstance(conf)

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
        new Path("hdfs://nameservice1/tmp"))

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }

}

Update to include the solution The trick was, as @Holden mentioned below, the conf wasn't getting passed through. To remedy this, I was able to get it working by changing this the call to newAPIHadoopRDD to this:

val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

There was a second issue that was also highlighted by @victor's answer, that I was not passing in a scan. To fix that, I added this line and method:

conf.set(TableInputFormat.SCAN, convertScanToString(scan))

 def convertScanToString(scan : Scan) = {
      val proto = ProtobufUtil.toScan(scan);
      Base64.encodeBytes(proto.toByteArray());
    }

This also let me pull out this line from the conf.set commands:

conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")

*NOTE: This was for HBase version 0.96.1.1 on CDH5.0

Final full code for easy reference:

object TestSnap  {
  def main(args: Array[String]) {
    val config = ConfigFactory.load()
    val hbaseRootDir =  config.getString("hbase.rootdir")
    val sparkConf = new SparkConf()
      .setAppName("testnsnap")
      .setMaster(config.getString("spark.app.master"))
      .setJars(SparkContext.jarOfObject(this))
      .set("spark.executor.memory", "2g")
      .set("spark.default.parallelism", "160")

    val sc = new SparkContext(sparkConf)

    println("Creating hbase configuration")
    val conf = HBaseConfiguration.create()

    conf.set("hbase.rootdir", hbaseRootDir)
    conf.set("hbase.zookeeper.quorum",  config.getString("hbase.zookeeper.quorum"))
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
    val scan = new Scan
    conf.set(TableInputFormat.SCAN, convertScanToString(scan))

    val job = Job.getInstance(conf)

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
        new Path("hdfs://nameservice1/tmp"))

    val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }

  def convertScanToString(scan : Scan) = {
      val proto = ProtobufUtil.toScan(scan);
      Base64.encodeBytes(proto.toByteArray());
  }

}
3
I understand that, the only reason to use snapshot rather than actual hbase table, is to speed up the process. However, have to considered, from where RDD is reading when you use Hbase table. Like is it HLog files or any other.And once that aspect is confirmed, is snapshot and actual table are similar in the above aspect. We had similar issue with external framework integration with hbase. All works good, if we go by traditional approach. Anything new to cut the time, the framework had some limitations.Ramzy
I would expect to be just directly accessing the HFiles through the snapshot via HDFS and the gains would be in just streaming the data into an RDD straight from disk, by-passing any calls out to HBase.dmcnelis
A snapshot consists of reference to the files that are in the table at the moment the snapshot is taken. No copies of the data are made during the snapshot operation, but copies may be made when a compaction or deletion is triggered. So the newAPIHadoopRDD() method needs to be having extra logic to fetch the actual HFiles from snapshot, rather than the regular look up to hadoop/hbase file. Need to confirm this behavior at RDD levelRamzy
It would probably be helpful to have the logs so that we can verify that the information being set is being passed all the way down.Holden
I referred this code for a similar usecase, just instead of a "job" object I'm using TableSnapshotInputFormatImpl.setInput(config ,snapShotName,path) This works fine but the data extraction is very slow fir the same range "R/0"-"R/1", this method took 4hours to extract 85GB of data while normal job which queried a table with same range finished in 10 minutes, any idea what might be the issue. Spark DAG and "explain" plan for both the jobs is exactly the same.Omkar Rahane

3 Answers

3
votes

Looking at the Job information, its making a copy of the conf object you are supplying to it (The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.) so most likely the information that you need to set on the conf object isn't getting passed down to Spark. You could instead use TableSnapshotInputFormatImpl which has a similar method that works on conf objects. There might be additional things needed but at first pass through the problem this seems like the most likely cause.

As pointed out in the comments, another option is to use job.getConfiguration to get the updated config from the job object.

1
votes

You have not configured your M/R job properly: This is example in Java on how to configure M/R over snapshots:

Job job = new Job(conf);
Scan scan = new Scan();
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
       scan, MyTableMapper.class, MyMapKeyOutput.class,
       MyMapOutputValueWritable.class, job, true);
}

You, definitely, skipped Scan. I suggest you taking a look at TableMapReduceUtil's initTableSnapshotMapperJob implementation to get idea how to configure job in Spark/Scala.

0
votes

Here is complete configuration in mapreduce Java

TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, // Name of the snapshot
                scan, // Scan instance to control CF and attribute selection
                DefaultMapper.class, // mapper class
                NullWritable.class, // mapper output key
                Text.class, // mapper output value
                job,
                true,
                restoreDir);