1
votes

I need to bulk edit HBase data, editing the content of a specific cell for each row. Passing through HBase PUT/GET APIs is not an option since this would be extremely slow. I would like to set up a Spark task which load HBase HFile into proper defined DFs, let me edit data in specific columns, then save the data back to HDFS, maintaining the HFile format.

I found several guides on how to bulk write HFile from Spark to HDFS, however, I am not sure on how to fetch the data from HDFS. Which kind of DataFrame/RDD is best suited for this kind of task?

Thanks

1

1 Answers

2
votes

Answering myself in case someone else will need this.

It is possible to load HFiles from HBase snapshot. Follow this procedure: (In HBase shell) 1. disable 'namespace:table' 2. snapshot 'namespace:table' 'your_snapshot'

This will create an accessible snapshot you can access on /[HBase_path]/.snapshot/[your_snapshot]

To load a snapshot as an RDD[ImmutableBytesWritable, Result]

  def loadFromSnapshot(sc: SparkContext): RDD[ImmutableBytesWritable, Result] = {

val restorePath =
  new Path(s"hdfs://$storageDirectory/$restoreDirectory/$snapshotName")
val restorePathString = restorePath.toString

// create hbase conf starting from spark's hadoop conf
val hConf = HBaseConfiguration.create()
val hadoopConf = sc.hadoopConfiguration
HBaseConfiguration.merge(hConf, hadoopConf)

// point HBase root dir to snapshot dir
hConf.set("hbase.rootdir",
  s"hdfs://$storageDirectory/$snapshotDirectory/$snapshotName/")

// point Hadoop to the bucket as default fs
hConf.set("fs.default.name", s"hdfs://$storageDirectory/")

// configure serializations
hConf.setStrings("io.serializations",
  hadoopConf.get("io.serializations"),
  classOf[MutationSerialization].getName,
  classOf[ResultSerialization].getName,
  classOf[KeyValueSerialization].getName)

// disable caches
hConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)
hConf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f)
hConf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY)

// configure TableSnapshotInputFormat
hConf.set("hbase.TableSnapshotInputFormat.snapshot.name", settingsAccessor.settings.snapshotName)
hConf.set("hbase.TableSnapshotInputFormat.restore.dir", restorePathString)

val scan = new Scan()     // Fake scan which is applied by spark on HFile. Bypass RPC
val scanString = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}
hConf.set(TableInputFormat.SCAN, scanString)

val job = Job.getInstance(hConf)

TableSnapshotInputFormat.setInput(job, settingsAccessor.settings.snapshotName, restorePath)

// create RDD
sc.newAPIHadoopRDD(job.getConfiguration,
  classOf[TableSnapshotInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
}

This will load HFile from the snapshot directory and will apply a "fake" full scan on them, which avoids slow remote procedure calls but allow to have the same output of a scan.

When you are done you can re-enable your table

  • enable 'nasmespace:table' Optionally you can also delete the snapshot (no data will actually be deleted)
  • delete_snapshot 'your_snapshot'