0
votes

Trying to connect to Hbase using spark based on the below documentation provided by Hbase.

https://hbase.apache.org/book.html#_sparksql_dataframes

code:

val cat =
         s"""{
            |"table":{"namespace":"test", "name":"data_inv"},
            |"rowkey":"key",
            |"columns":{
            |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
            |"col1":{"cf":"src_data", "col":"src_stream_desc", "type":"string"}
            |}
            |}""".stripMargin

    val spark = SparkSession
      .builder()
      .appName(getClass.toString)
      .getOrCreate()

    val hbaseContext = new HBaseContext(spark.sparkContext, spark.sparkContext.hadoopConfiguration)


    val df = withCatalog(cat, spark)

    df.printSchema()

   df.show(20, false)

def withCatalog(cat: String,spark:SparkSession): DataFrame = {
    spark.sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog->cat))
      .format("org.apache.hadoop.hbase.spark")
      .load()
  }

Using the API: https://github.com/apache/hbase-connectors/tree/master/spark

But getting the below error message,

Caused by: java.net.SocketTimeoutException: callTimeout=60000, callDuration=69024: Call to hostname/ip:60020 failed on local exception: java.io.IOException: Connection closed row 'data_inv,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=hostname,60020,1557761206926, seqNum=-1
    at org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:158)
    at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Call to hostname/ip:60020 failed on local exception: java.io.IOException: Connection closed
    at org.apache.hadoop.hbase.ipc.IPCUtil.wrapException(IPCUtil.java:180)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient.onCallFinished(AbstractRpcClient.java:390)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient.access$100(AbstractRpcClient.java:95)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient$3.run(AbstractRpcClient.java:410)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient$3.run(AbstractRpcClient.java:406)
    at org.apache.hadoop.hbase.ipc.Call.callComplete(Call.java:103)
    at org.apache.hadoop.hbase.ipc.Call.setException(Call.java:118)
    at org.apache.hadoop.hbase.ipc.NettyRpcDuplexHandler.cleanupCalls(NettyRpcDuplexHandler.java:202)
    at org.apache.hadoop.hbase.ipc.NettyRpcDuplexHandler.channelInactive(NettyRpcDuplexHandler.java:210)
    at org.apache.hbase.thirdparty.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.hbase.thirdparty.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.hbase.thirdparty.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)

Could someone help what this error exactly ?

1

1 Answers

0
votes

Looks like your defaultFS is not correct, can you try?:

spark.sparkContext.hadoopConfiguration.set("fs.defaultFS", "hdfs://IP:PORT")
val hbaseContext = new HBaseContext(spark.sparkContext, spark.sparkContext.hadoopConfiguration)