17
votes

I have a spark DF with rows of Seq[(String, String, String)]. I'm trying to do some kind of a flatMap with that but anything I do try ends up throwing

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple3

I can take a single row or multiple rows from the DF just fine

df.map{ r => r.getSeq[Feature](1)}.first

returns

Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....

and the data type of the RDD seems correct.

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

The schema of the df is

root
 |-- article_id: long (nullable = true)
 |-- content_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lemma: string (nullable = true)
 |    |    |-- pos_tag: string (nullable = true)
 |    |    |-- ne_tag: string (nullable = true)

I know this problem is related to spark sql treating the RDD rows as org.apache.spark.sql.Row even though they idiotically say that it's a Seq[(String, String, String)]. There's a related question (link below) but the answer to that question doesn't work for me. I am also not familiar enough with spark to figure out how to turn it into a working solution.

Are the rows Row[Seq[(String, String, String)]] or Row[(String, String, String)] or Seq[Row[(String, String, String)]] or something even crazier.

I'm trying to do something like

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)

which appears to work but doesn't actually

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first

throws the above error. So how am I supposed to (for instance) get the first element of the second tuple on each row?

Also WHY has spark been designed to do this, it seems idiotic to claim that something is of one type when in fact it isn't and can not be converted to the claimed type.


Related question: GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

Related bug report: http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

2
May I ask who down-voted this question and why ?eliasah
the related bug report was the solution for meIstvan Nagy

2 Answers

20
votes

Well, it doesn't claim that it is a tuple. It claims it is a struct which maps to Row:

import org.apache.spark.sql.Row

case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])

val df = Seq(
  Record(1L, Seq(
    Feature("ancient", "jj", "o"),
    Feature("olympia_greece", "nn", "location")
  ))
).toDF

val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))

You'll find exact mapping rules in the Spark SQL programming guide.

Since Row is not exactly pretty structure you'll probably want to map it to something useful:

content.map(_.map {
  case Row(lemma: String, pos_tag: String, ne_tag: String) => 
    (lemma, pos_tag, ne_tag)
})

or:

content.map(_.map ( row => (
  row.getAs[String]("lemma"),
  row.getAs[String]("pos_tag"),
  row.getAs[String]("ne_tag")
)))

Finally a slightly more concise approach with Datasets:

df.as[Record].rdd.map(_.content_processed)

or

df.select($"content_processed").as[Seq[(String, String, String)]]

although this seems to be slightly buggy at this moment.

There is important difference the first approach (Row.getAs) and the second one (Dataset.as). The former one extract objects as Any and applies asInstanceOf. The latter one is using encoders to transform between internal types and desired representation.

0
votes
object ListSerdeTest extends App {

  implicit val spark: SparkSession = SparkSession
    .builder
    .master("local[2]")
    .getOrCreate()


  import spark.implicits._
  val myDS = spark.createDataset(
    Seq(
      MyCaseClass(mylist = Array(("asd", "aa"), ("dd", "ee")))
    )
  )

  myDS.toDF().printSchema()

  myDS.toDF().foreach(
    row => {
      row.getSeq[Row](row.fieldIndex("mylist"))
        .foreach {
          case Row(a, b) => println(a, b)
        }
    }
  )
}

case class MyCaseClass (
                 mylist: Seq[(String, String)]
               )

Above code is yet another way to deal with nested structure. Spark default Encoder will encode TupleX, making them nested struct, that's why you are seeing this strange behaviour. and like others said in the comment, you can't just do getAs[T]() since it's just a cast(x.asInstanceOf[T]), therefore will give you runtime exceptions.