1
votes

I'm using the Cloudera's SparkOnHBase module in order to get data from HBase.

I get a RDD in this way:

var getRdd = hbaseContext.hbaseRDD("kbdp:detalle_feedback", scan)

Based on that, what I get is an object of type

RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])]

which corresponds to row key and a list of values. All of them represented by a byte array.

If I save the getRDD to a file, what I see is:

([B@f7e2590,[([B@22d418e2,[B@12adaf4b,[B@48cf6e81), ([B@2a5ffc7f,[B@3ba0b95,[B@2b4e651c), ([B@27d0277a,[B@52cfcf01,[B@491f7520), ([B@3042ad61,[B@6984d407,[B@f7c4db0), ([B@29d065c1,[B@30c87759,[B@39138d14), ([B@32933952,[B@5f98506e,[B@8c896ca), ([B@2923ac47,[B@65037e6a,[B@486094f5), ([B@3cd385f2,[B@62fef210,[B@4fc62b36), ([B@5b3f0f24,[B@8fb3349,[B@23e4023a), ([B@4e4e403e,[B@735bce9b,[B@10595d48), ([B@5afb2a5a,[B@1f99a960,[B@213eedd5), ([B@2a704c00,[B@328da9c4,[B@72849cc9), ([B@60518adb,[B@9736144,[B@75f6bc34)])

for each record (rowKey and the columns)

But what I need is to get the String representation of all and each of the keys and values. Or at least the values. In order to save it to a file and see something like

key1,(value1,value2...)

or something like

key1,value1,value2...

I'm completely new on spark and scala and it's being quite hard to get something.

Could you please help me with that?

2

2 Answers

6
votes

First lets create some sample data:

scala> val d = List( ("ab" -> List(("qw", "er", "ty")) ), ("cd" -> List(("ac", "bn", "afad")) ) )
d: List[(String, List[(String, String, String)])] = List((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))

This is how the data is:

scala> d foreach println
(ab,List((qw,er,ty)))
(cd,List((ac,bn,afad)))

Convert it to Array[Byte] format

scala> val arrData = d.map { case (k,v) => k.getBytes() -> v.map { case (a,b,c) => (a.getBytes(), b.getBytes(), c.getBytes()) } }

arrData: List[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = List((Array(97, 98),List((Array(113, 119),Array(101, 114),Array(116, 121)))), (Array(99, 100),List((Array(97, 99),Array(98, 110),Array(97, 102, 97, 100)))))

Create an RDD out of this data

scala> val rdd1 = sc.parallelize(arrData)
rdd1: org.apache.spark.rdd.RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = ParallelCollectionRDD[0] at parallelize at <console>:25

Create a conversion function from Array[Byte] to String:

scala> def b2s(a: Array[Byte]): String = new String(a)
b2s: (a: Array[Byte])String

Perform our final conversion:

scala> val rdd2 = rdd1.map { case (k,v) => b2s(k) -> v.map{ case (a,b,c) => (b2s(a), b2s(b), b2s(c)) } }
rdd2: org.apache.spark.rdd.RDD[(String, List[(String, String, String)])] = MapPartitionsRDD[1] at map at <console>:29

scala> rdd2.collect()
res2: Array[(String, List[(String, String, String)])] = Array((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))
0
votes

I don't know about HBase but if those Array[Byte]s are Unicode strings, something like this should work:

rdd: RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = *whatever*
rdd.map(k, l => 
  (new String(k),
  l.map(a => 
    a.map(elem =>
      new String(elem)
    )
  ))
)

Sorry for bad styling and whatnot, I am not even sure it will work.