0
votes

Our dataflow pipeline has a DoFn that reads from bigtable using the hbase multiget client api. This seems to cause dataflow to stall randomly with the following stack:

Processing stuck in step AttachStuff/BigtableAttacher for at least 04h10m00s without outputting or completing in state process at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:523) at com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get(AbstractApiFuture.java:56) at com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback(BatchExecutor.java:276) at com.google.cloud.bigtable.hbase.BatchExecutor.batch(BatchExecutor.java:239) at com.google.cloud.bigtable.hbase.AbstractBigtableTable.get(AbstractBigtableTable.java:241) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors(BigtableAnchorsAttacher.java:86) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.process(BigtableAnchorsAttacher.java:129) at com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling(ScioDoFn.java:39) at com.askscio.google.docbuilder.BigtableAnchorsAttacher$DoFnInvoker.invokeProcessElement(Unknown Source)

We are on beam library 2.12.0. The DoFn inits the bigtable connection in StartBundle.

Each DoFn invocation looks up no more than 10 keys from bigtable

Its single cluster, 3 nodes and SSD. Storage utilization is 2.2 GB, max node CPU utilization is 13% and max read/write rates are 2000 reads/sec and 1000 writes/sec

startBundle:

bigtableConn = BigtableConfiguration.connect(
    config.getString(ConfigKeys.Google.PROJECT_ID),
    config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));

process:

List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
   Get get = new Get(Bytes.toBytes(s))
                     .addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
                        .setMaxVersions(1);
   gets.add(get);
}
Result[] results= fooTable.get(gets);

teardown:

fooTable.close();
bigTableConn.close();
1
Note that startBundle should be paired with finishBundle and setup with teardown. Given that startBundle is called per bundle, and teardown is called per DoFn instance, perhaps you're opening and not closing too many bigtable connections?robertwb
So the actual startBundle code is like below. Given we only create the connection when the variable is null this mismatch should not result in a connection leak right? ` class Foo { static Connection conn=null; @StartBundle public void startBundle(StartBundleContext bundleContext) throws Exception { synchronized (Foo.class) { if (conn == null) { // create bigtable conn and assign conn to that } } } } `Vishwanath T R
OK, that looks fine. Are you (close to) exceeding your read limits? It does look like it's hanging waiting for the read to finish.robertwb
Per the bigtable instance page my 3 node ssd cluster can do 30,000 rows/s. I checked that the max read rate for my bigtable instance is 2000 rows/secVishwanath T R
Can you provide further detail about the client library version you're using, i.e. the version of bigtable-hbase-beam in your pom.xml?Ramesh Dharan

1 Answers

1
votes

I would recommend moving connection management to @Setup & Teardown and using reference counts in case you are using multi core workers.

Bigtable connections are very heavy weight and are intended to be singleton per process. The HBase connection object returned by BigtableConfiguration.connect() actually wraps a grpc channel pool with 2 channels per cpu, which is very expensive to construct.

You have a few options to improve your pipeline:

  1. set the config option "google.bigtable.use.cached.data.channel.pool" to "true", which will reuse an internal connection pool

  2. Do something like this in your DoFn:

    // instance vars
    static Object connectionLock = new Object();
    static Connection bigtableConn = null;
    
    // @Setup
    synchronized(connectionLock) {
      if (numWorkers++ == 0) {
        bigtableConn = BigtableConfiguration.connect(...);
      } 
    }
    
    // @Teardown
    synchronized(connectionLock) {
      if (--numWorkers == 0) {
        bigtableConn.close();
      } 
    }