0
votes

My Hbase table has 30 Million records, each record has the column raw:sample, raw is columnfamily sample is column. This column is very big, the size from a few KB to 50MB. When I run the following Spark code, it only can get 40 thousand records but I should get 30 million records:

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "10.1.1.15:2181")
conf.set(TableInputFormat.INPUT_TABLE, "sampleData")
conf.set(TableInputFormat.SCAN_COLUMNS, "raw:sample")
conf.set("hbase.client.keyvalue.maxsize","0")
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
var arrRdd:RDD[Map[String,Object]] = hBaseRDD.map(tuple => tuple._2).map(...)

Right now I work around this by get the id list first then iterate the id list to get the column raw:sample by pure Hbase java client in Spark foreach. Any ideas please why I can not get all of the column raw:sample by Spark, is it because the column too big?

A few days ago one of my zookeeper nodes and datanodes down, but I fixed it soon since the replica is 3, is this the reason? Would think if I run hbck -repair would help, thanks a lot!

2

2 Answers

1
votes

Internally, TableInputFormat creates a Scan object in order to retrieve the data from HBase.

Try to create a Scan object (without using Spark), configured to retrieve the same column from HBase, see if the error repeats:

// Instantiating Configuration class
  Configuration config = HBaseConfiguration.create();

  // Instantiating HTable class
  HTable table = new HTable(config, "emp");

  // Instantiating the Scan class
  Scan scan = new Scan();

  // Scanning the required columns
  scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"));
  scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("city"));

  // Getting the scan result
  ResultScanner scanner = table.getScanner(scan);

  // Reading values from scan result
  for (Result result = scanner.next(); result != null; result = scanner.next())

  System.out.println("Found row : " + result);
  //closing the scanner
  scanner.close();

In addition, by default, TableInputFormat is configured to request a very small chunk of data from the HBase server (which is bad and causes a large overhead). Set the following to increase the chunk size:

scan.setBlockCache(false);
scan.setCaching(2000);
1
votes

For a high throughput like yours, Apache Kafka is the best solution to integrate the data flow and keeping data pipeline alive. Please refer http://kafka.apache.org/08/uses.html for some use cases of kafka

One more http://sites.computer.org/debull/A12june/pipeline.pdf