1
votes

I'm using Spark and Scala to read some parquet files. The problem I am facing is the content of this parquet files may vary, that is some fields sometimes are not present. So when I try to access a fields which doesn't exist in a file, I get the following exception:

java.lang.IllegalArgumentException: Field "wrongHeaderIndicator" does not exist.

I did something similar in Java once, and it was possible to use contains() or get(index)!= null to check if the field we are trying to access exists or not. But I am not able to do the same in Scala.

Below you can see what I have written so far and the four things I tried, without success.

//The part of reading the parquet file and accessing the rows works fine
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val parquetFileDF = sqlContext.read.parquet("myParquet.parquet')

//I get one of the six blocks in the parquet file
val myHeaderData = parquetFileDF.select("HeaderData").collectAsList()

//When I try to access a particular field which is not in the "HeaderData"
//I get the exception

//1st Try
Option(myHeaderData.get(0).getStruct(0).getAs[String]("wrongHeaderIndicator")) match {
      case Some(i) => println("This data exist")
      case None => println("This would be a null") 
}

//2nd Try
if(myHeaderData.get(0).getStruct(0).getAs[String]("wrongHeaderIndicator")!= null)
        println("This data exist")
    else
        println("This is null")

//3rd Try
println(myHeaderData.get(0).getStruct(0).fieldIndex("wrongHeaderIndicator"))

//4th Try
println(Some(myHeaderData.get(0).getStruct(0).getAs[String]("wrongHeaderIndicator")))

Edit. The problem is not when I access the columns of the DataFrame. The columns are always the same, and I don't need to perform checkings before the select. The problem come once I access the fields of the records in a particular column. Those records are structures which schema you can see below:

The schema of the column myHeaderData is similar to:

|-- myHeaderData: struct (nullable = true)
 |    |-- myOpIndicator: string (nullable = true)
 |    |-- mySecondaryFlag: string (nullable = true)
 |    |-- myDownloadDate: string (nullable = true)
 |    |-- myDownloadTime: string (nullable = true)
 |    |-- myUUID: string (nullable = true)

And if I run

myHeaderData.get(0).getStruct(0).schema

I get the following output:

StructType(StructField(myOpIndicator,StringType,true), StructField(mySecondaryFlag,StringType,true), StructField(myDownloadDate,StringType,true), StructField(myDownloadTime,StringType,true), StructField(myUUID,StringType,true))

The four things I tried produce the same exception. Can anyone tell me what can I use to check if a field exist in a structure without generating the Exception?

Thanks

2
I want to mention I did look for this question already in stackoverflow, but I couldn't find a solution for my problem. If someones could refer me to an already answered question, please add it in this comment and I will close this question on my own.Ignacio Alorre
It seems That you are trying to look up "wrongHeaderIndicator" and compare its output. But you should be cheking if "wrongHeaderIndicator" itself exists.Gabain1993
@Gabain1993 Yes. How can I check it if existIgnacio Alorre
@user8371915, no. I do know how to detect if there is a column in a data grame. Here what I want is to check if there is an element inside a structure.Ignacio Alorre

2 Answers

2
votes

You're making wrong assumption. If getAs field which doesn't exist it will throw exception, not return null. Therefore you should use Try:

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.Row

val row: Row = spark.read.json(sc.parallelize(Seq(
  """{"myHeaderData": {"myOpIndicator": "foo"}}"""))).first

Try(row.getAs[Row]("myHeaderData").getAs[String]("myOpIndicator")) match {
  case Success(s) => println(s)
  case _ => println("error")
}
2
votes

You can easily check if a column exists in your dataframe or not. Use the df.columns method to get an array with all the headers in your data, then check if your column (wrongHeaderIndicator) is in the array or not. Here is a short example:

val df = Seq(("aaa", "123"), ("bbb", "456"), ("ccc", "789")).toDF("col1", "col2")
df.show()

+----+----+
|col1|col2|
+----+----+
| aaa| 123|
| bbb| 456|
| ccc| 789|
+----+----+

Using df.columns.toList will now give you List(col1, col2). To check if your field is present or not, simply do df.columns.contains("fieldName").