2
votes

I am trying a simple code on Spark / Scala REPL and get this error. How can resolve this issue. I want to save the RDD to HBase using RDD.saveAsNewAPIHadoopDataset(conf)

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.OutputFormat
import org.apache.hadoop.hbase.client.Mutation
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.HashPartitioner

val tableName = "test"
val cfIndex = "cf".getBytes()
val colIndexId = "c1".getBytes()

val RDD = sc.parallelize(List(("1","2"),("1","2"),("1","3"),  ("3","3")),2).repartition(2).mapPartitions { part =>
  val tableName = "test"
  val cfIndex = "cf".getBytes()
  val colIndexId = "c01".getBytes()
  part.map { case(k,v) =>       
    val put = new Put(k.getBytes())
    put.add(cfIndex, colIndexId, v.getBytes())
    (k, put)
  }
}

ERROR TaskSetManager: Task 0.0 in stage 5.0 (TID 17) had a not serializable result: org.apache.hadoop.hbase.client.Put

1

1 Answers

0
votes

Put is not serializable - since it's in your map closure it will be executed on the worker nodes and therefore would need to be serialized and sent from the driver to the worker nodes. Since it is non-serializable it can't be causing the exception.