Our dataflow pipeline has a DoFn that reads from bigtable using the hbase multiget client api. This seems to cause dataflow to stall randomly with the following stack:
Processing stuck in step AttachStuff/BigtableAttacher for at least 04h10m00s without outputting or completing in state process at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:523) at com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get(AbstractApiFuture.java:56) at com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback(BatchExecutor.java:276) at com.google.cloud.bigtable.hbase.BatchExecutor.batch(BatchExecutor.java:239) at com.google.cloud.bigtable.hbase.AbstractBigtableTable.get(AbstractBigtableTable.java:241) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors(BigtableAnchorsAttacher.java:86) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.process(BigtableAnchorsAttacher.java:129) at com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling(ScioDoFn.java:39) at com.askscio.google.docbuilder.BigtableAnchorsAttacher$DoFnInvoker.invokeProcessElement(Unknown Source)
We are on beam library 2.12.0. The DoFn inits the bigtable connection in StartBundle.
Each DoFn invocation looks up no more than 10 keys from bigtable
Its single cluster, 3 nodes and SSD. Storage utilization is 2.2 GB, max node CPU utilization is 13% and max read/write rates are 2000 reads/sec and 1000 writes/sec
startBundle:
bigtableConn = BigtableConfiguration.connect(
config.getString(ConfigKeys.Google.PROJECT_ID),
config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));
process:
List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
Get get = new Get(Bytes.toBytes(s))
.addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
.setMaxVersions(1);
gets.add(get);
}
Result[] results= fooTable.get(gets);
teardown:
fooTable.close();
bigTableConn.close();