You don’t have to worry about batching when reading rows. The Bigtable responses get streamed and are backpressure aware. We rely on GRPC to buffer chunks of the stream as well.
Here is a link to an introduction about GRPC streaming:
https://grpc.io/docs/guides/concepts.html#server-streaming-rpc
Would mind trying out this sample code and let me know if it works (ie. no deadline exceeded errors). If the sample code works, please modify it to scan your own data and make sure that it still works. If something doesn't, please let me know.
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud.example</groupId>
<artifactId>row-write-read-example</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-1.x</artifactId>
<version>1.0.0-pre3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
java:
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import java.io.IOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
public class WriteReadTest {
private static final String PROJECT_ID = "<YOUR_PROJECT_ID>";
private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>";
private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>";
private static final String FAMILY = "cf";
private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID);
public static void main(String[] args) throws IOException {
try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID);
Admin admin = connection.getAdmin()) {
// Setup
admin.createTable(
new HTableDescriptor(TABLE_NAME)
.addFamily(new HColumnDescriptor(FAMILY))
);
try {
// Write the rows
populateTable(connection, 2_000_000);
// Read the rows
readFullTable(connection);
} finally {
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
}
}
}
private static void populateTable(Connection connection, int rowCount) throws IOException {
long startTime = System.currentTimeMillis();
int buckets = 100;
int maxWidth = Integer.toString(buckets).length();
try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) {
for (int i = 0; i < rowCount; i++) {
String prefix = String.format("%0" + maxWidth + "d", i % buckets);
String key = prefix + "-" + String.format("%010d", i);
String value = "value-" + key;
Put put = new Put(key.getBytes())
.addColumn(
FAMILY.getBytes(),
HConstants.EMPTY_BYTE_ARRAY,
value.getBytes()
);
bufferedMutator.mutate(put);
}
}
long endTime = System.currentTimeMillis();
System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime) / 1000, rowCount);
}
private static void readFullTable(Connection connection) throws IOException {
long startTime = System.currentTimeMillis();
int count = 0;
try(Table table = connection.getTable(TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) {
for(Result row = scanner.next(); row != null; row = scanner.next()) {
count++;
}
}
long endTime = System.currentTimeMillis();
System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime) / 1000, count);
}
}