2
votes

I want to convert RDD to DataFrame using StructType. But item "Broken,Line," would cause error. Is there an elegant way to process record like this? Thanks.

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row

val mySchema = StructType(Array(
  StructField("colA", StringType, true),
  StructField("colB", StringType, true),
  StructField("colC", StringType, true)))
val x = List("97573,Start,eee", "9713,END,Good", "Broken,Line,")
val inputx = sc.parallelize(x).
| map((x:String) => Row.fromSeq(x.split(",").slice(0,mySchema.size).toSeq))
val df = spark.createDataFrame(inputx, mySchema)
df.show

Error would be like this:

Name: org.apache.spark.SparkException Message: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 2

I'm using:

  • Spark: 2.2.0
  • Scala: 2.11.8

And I ran the code in spark-shell.

1
What's the error? your code looks fine. You can use x.split("," , -1) if you're worried about that part. - philantrovert
@philantrovert Thanks, I just updated my question - kingbase

1 Answers

2
votes

Row.fromSeq on which we apply your schema throws the error that you are getting. Your third element in your list contains just 2 elements. You can't transform it into a Row with three elements unless you add a null value instead of the missing value.

And when creating your DataFrame, Spark is expecting 3 elements per Row on which to apply the schema, thus the error.

A quick and dirty solution would be to use scala.util.Try to get fields separately :

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
import scala.util.Try

val mySchema = StructType(Array(StructField("colA", StringType, true), StructField("colB", StringType, true), StructField("colC", StringType, true)))

val l = List("97573,Start,eee", "9713,END,Good", "Broken,Line,")

val rdd = sc.parallelize(l).map {
 x => {
  val fields = x.split(",").slice(0, mySchema.size)
  val f1 = Try(fields(0)).getOrElse("")
  val f2 = Try(fields(1)).getOrElse("")
  val f3 = Try(fields(2)).getOrElse("")
  Row(f1, f2, f3)
 }
}

val df = spark.createDataFrame(rdd, mySchema)

df.show
// +------+-----+----+
// |  colA| colB|colC|
// +------+-----+----+
// | 97573|Start| eee|
// |  9713|  END|Good|
// |Broken| Line|    |
// +------+-----+----+

I wouldn't say that it's an elegant solution like you've asked. Parsing strings is never elegant ! You ought using the csv source to read it correctly (or spark-csv for < 2.x).