2
votes

What Happened
All the data from last month was corrupted due to a bug in the system. So we have to delete and re-input these records manually. Basically, I want to delete all the rows inserted during a certain period of time. However, I found it difficult to scan and delete millions of rows in HBase.

Possible Solutions
I found two way to bulk delete:
The first one is to set a TTL, so that all the outdated record would be deleted automatically by the system. But I want to keep the records inserted before last month, so this solution does not work for me.

The second option is to write a client using the Java API:

 public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
    Table table = null;
    Connection connection = null;

    try {
        Scan scan = new Scan();
        scan.setTimeRange(minTime, maxTime);
        connection = HBaseOperator.getHbaseConnection();
        table = connection.getTable(TableName.valueOf(tableName));
        ResultScanner rs = table.getScanner(scan);

        List<Delete> list = getDeleteList(rs);
        if (list.size() > 0) {

            table.delete(list);
        }
    } catch (Exception e) {
        e.printStackTrace();

    } finally {
        if (null != table) {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

private static List<Delete> getDeleteList(ResultScanner rs) {

    List<Delete> list = new ArrayList<>();
    try {

        for (Result r : rs) {
            Delete d = new Delete(r.getRow());
            list.add(d);
        }
    } finally {
        rs.close();
    }
    return list;
}

But in this approach, all the records are stored in ResultScanner rs, so the heap size would be huge. And if the program crushes, it has to start from the beginning.
So, is there a better way to achieve the goal?

2

2 Answers

2
votes

Don't know how many 'millions' you are dealing with in your table, but the simples thing is to not try to put them all into a List at once but to do it in more manageable steps by using the .next(n) function. Something like this:

for (Result row : rs.next(numRows))
{
Delete del = new Delete(row.getRow());
...
}

This way, you can control how many rows get returned from the server via a single RPC through the numRows parameter. Make sure it's large enough so as not to make too many round-trips to the server, but at the same time not too large to kill your heap. You can also use the BufferedMutator to operate on multiple Deletes at once.

Hope this helps.

2
votes

I would suggest two improvements:

  1. Use BufferedMutator to batch your deletes,  it does exactly what you need – keeps internal buffer of mutations and flushes it to HBase when buffer fills up, so you do not have to worry about keeping your own list, sizing and flushing it.
  2. Improve your scan:
    • Use KeyOnlyFilter – since you do not need the values, no need to retrieve them
    • use scan.setCacheBlocks(false) - since you do a full-table scan, caching all blocks on the region server does not make much sense
    • tune scan.setCaching(N) and scan.setBatch(N) – the N will depend on the size of your keys, you should keep a balance between caching more and memory it will require; but since you only transfer keys, the N could be quite large, I suppose.

Here's an updated version of your code:

public static void deleteTimeRange(String tableName, Long minTime, Long maxTime) {
    try (Connection connection = HBaseOperator.getHbaseConnection();
         final Table table = connection.getTable(TableName.valueOf(tableName));
         final BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {

        Scan scan = new Scan();
        scan.setTimeRange(minTime, maxTime);
        scan.setFilter(new KeyOnlyFilter());
        scan.setCaching(1000);
        scan.setBatch(1000);
        scan.setCacheBlocks(false);
        try (ResultScanner rs = table.getScanner(scan)) {
            for (Result result : rs) {
                mutator.mutate(new Delete(result.getRow()));
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Note the use of "try with resource" – if you omit that, make sure to .close() mutator, rs, table, and connection.