1
votes

I'm trying to parse a RDD[Seq[String]] to Dataframe. ALthough it's a Seq of Strings they could have a more specific type as Int, Boolean, Double, String an so on. For example, a line could be:

"hello", "1", "bye", "1.1"
"hello1", "11", "bye1", "2.1"
...

Another execution could have a different number of columns.

First column is going to be always a String, second an int and so on and it's going to be always on this way. On the other hand, one execution could have seq of five elements and others execution could have 2000, so it depends of the execution. In each execution the name of type of columns is defined.

To do it, I could have something like this:

//I could have a parameter to generate the StructType dinamically.
def getSchema(): StructType = {
  var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
  schemaArray += StructField("col1" , IntegerType, true)
  schemaArray += StructField("col2" , StringType, true)
  schemaArray += StructField("col3" , DoubleType, true)
  StructType(schemaArray)
}

//Array of Any?? it doesn't seem the best option!!
val l1: Seq[Any] = Seq(1,"2", 1.1 )
val rdd1 = sc.parallelize(l1).map(Row.fromSeq(_))

val schema = getSchema()
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
df.schema

I don't like at all to have a Seq of Any, but it's really what I have. Another chance??

On the other hand I was thinking that I have something similar to a CSV, I could create one. With spark there is a library to read an CSV and return a dataframe where types are infered. Is it possible to call it if I have already an RDD[String]?

1

1 Answers

2
votes

Since number of columns changes for each execution I would suggest to go with CSV option with delimiter set to space or something else. This way spark will figure out columns types for you.

Update:

Since you mentioned that you read data from HBase, one way to go is to convert HBase row to JSON or CSV and then to convert the RDD to dataframe:

val jsons = hbaseContext.hbaseRDD(tableName, scan).map{case (_, r) =>
  val currentJson = new JSONObject
  val cScanner = r.cellScanner
  while (cScanner.advance) {
    currentJson.put(Bytes.toString(cScanner.current.getQualifierArray, cScanner.current.getQualifierOffset, cScanner.current.getQualifierLength),
      Bytes.toString(cScanner.current.getValueArray, cScanner.current.getValueOffset, cScanner.current.getValueLength))
  }
  currentJson.toString
}
val df = spark.read.json(spark.createDataset(jsons))

Similar thing can be done for CSV.