4
votes

UPDATE: it seems that the recently released org.apache.beam.sdk.io.hbase-2.6.0 includes the HBaseIO.readAll() api. I tested in google dataflow, and it seems to be working. Will there be any issue or pitfall of using HBaseIO directly in Google Cloud Dataflow setting?

The BigtableIO.read takes PBegin as an input, I am wondering if there is anything like SpannerIO's readAll API, where the BigtableIO's read API input could be a PCollection of ReadOperations (e.g, Scan), and produce a PCollection<Result> from those ReadOperations.

I have a use case where I need to have multiple prefix scans, each with different prefix, and the number of rows with the same prefix can be small (a few hundred) or big (a few hundreds of thousands). If nothing like ReadAll is already available. I am thinking about having a DoFn to have a 'limit' scan, and if the limit scan doesn't reach the end of the key range, I will split it into smaller chunks. In my case, the key space is uniformly distributed, so the number of remaining rows can be well estimated by the last scanned row (assuming all keys smaller than the last scanned key is returned from the scan).

Apology if similar questions have been asked before.

1
We have exactly the same problem!!Mingtao Zhang

1 Answers

3
votes

HBaseIO is not compatible with Bigtable HBase connector due to region locator logic. And we haven't implemented the SplittableDoFn api for Bigtable yet.

How big are your rows, are they small enough that scanning a few hundred thousand row can be handled by a single worker?

If yes, then I'll assume that the expensive work you are trying parallelize is further down in your pipeline. In this case, you can:

  • create a subclass of AbstractCloudBigtableTableDoFn
  • in the DoFn, use the provided client directly, issuing scan for each prefix element
  • Each row resulting from the scan should be assigned a shard id and emitted as a KV(shard id, row). The shard id should be a incrementing int mod some multiple of the number of workers.
  • Then do a GroupBy after the custom DoFn to fan out the shards. It's important to do a GroupByKey to allow for fanout, otherwise a single worker will have to process all of the emitted rows for a prefix.

If your rows are big and you need to split each prefix scan across multiple workers then you will have to augment the above approach:

  • in main(), issue a SampleRowKeys request, which will give rough split points
  • insert a step in your pipeline before the manual scanning DoFn to split the prefixes using the results from SampleRowsKeys. ie. If the prefix is a and SampleRowKeys contains 'ac', 'ap', 'aw', then the range that it should emit would be [a-ac), [ac-ap), [ap-aw), [aw-b). Assign a shard id and group by it.
  • feed the prefixes to manual scan step from above.