0
votes

We are trying to run a daily Dataflow pipeline that reads off Bigtable and dumps data into GCS (using HBase's Scan and BaseResultCoder as coder) as follows (just to highlight the idea):

  Pipeline pipeline = Pipeline.create(options); 
  Scan scan = new Scan();
  scan.setCacheBlocks(false).setMaxVersions(1);
  scan.addFamily(Bytes.toBytes("f"));
  CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build();
  pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance()));
  pipeline.run();

This seems to run perfectly as expected.

Now, we want to be able to use the dumped file in GCS for a recovery job if needed. That is, we want to have a dataflow pipeline which reads the dumped data (which is PCollection) from GCS and creates Mutations ('Put' objects, basically). For some reason, the following code fails with a bunch of NullPointerExceptions. We are unsure why that would be the case -- if-statements below which check for null or 0-length strings were added to see if that would make a difference, but it did not.

// Part of DoFn<Result,Mutation>
@Override
public void processElement(ProcessContext c) {
  Result result = c.element();
  byte[] row = result.getRow();
  if (row == null || row.length == 0) { // NullPointerException at this line
    return;
  }
  Put mutation = new Put(result.getRow());
  // go through the column/value entries of this row, and create a corresponding put mutation.
  for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) {
    byte[] qualifier = entry.getKey();
    if (qualifier == null || qualifier.length == 0) {
      continue;
    }
    byte[] val = entry.getValue();
    if (val == null || val.length == 0) {
      continue;
    }
    mutation.addImmutable(cf_bytes, qualifier, entry.getValue());
  }
  c.output(mutation);
}

The error we get is the following (line 83 is marked above):

(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83)

I have two questions: 1. Has someone experienced something like this when they try to ParDo on PCollection to get PCollection which is to be written to a bigtable? 2. Is this a reasonable approach? The end-goal is to be able to leave a daily snapshot of our bigtable (for a specific column family) on a regular basis by means of a back-up in case something bad happens. We wish to be able to read the back-up data via dataflow, and write it to bigtable when we need to.

Any suggestions and help will be really appreciated!

-------- Edit

Here is the code that scans Bigtable and dumps data to GCS: (Some details are hidden if they are not relevant.)

public static void execute(Options options) {
  Pipeline pipeline = Pipeline.create(options);
  final String cf = "f"; // some specific column family.
  Scan scan = new Scan();
  scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell.
  scan.addFamily(Bytes.toBytes(cf));

  CloudBigtableScanConfiguration btConfig =
      BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build();

  PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig)));

  PCollection<Mutation> mutation =
      result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder());

  mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder()));

  pipeline.run();
}

}

The job that reads the output of the above code has the following code: (This is the one throwing exception when reading from GCS)

public static void execute(Options options) {
  Pipeline pipeline = Pipeline.create(options);
  PCollection<Mutation> mutations = pipeline.apply(TextIO.Read
      .from("gs://path-to-files").withCoder(new HBaseMutationCoder()));

  CloudBigtableScanConfiguration config =
      BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build();
  if (config != null) {
    CloudBigtableIO.initializeForWrite(pipeline);
    mutations.apply(CloudBigtableIO.writeToTable(config));
  }
  pipeline.run();
}

}

The error I am getting (https://jpst.it/Qr6M) is a bit confusing as the mutations are all Put objects, but the error is about 'Delete' object.

1

1 Answers

1
votes

It's probably best to discuss this issue on the cloud bigtable client github issues page. We are currently working on import / export features like this one, so we'll respond quickly. We'll also explore this approach on our own, even if you don't add the github issue. The github issue will allow us to communicate better.

FWIW, I don't understand how you could get an NPE on the line you highlighted. Are you sure you have the right line?

EDIT (12/12):

The following processElement() method should work to convert a Result to a Put:

@Override
public void processElement(DoFn<Result, Mutation>.ProcessContext c) throws Exception {
  Result result = c.element();
  byte[] row = result.getRow();
  if (row != null && row.length > 0) {
    Put put = new Put(row);
    for (Cell cell : result.rawCells()) {
      put.add(cell);
    }
    c.output(put);
  }
}