0
votes

Enviornment - spark-3.0.1-bin-hadoop2.7, ScalaLibraryContainer 2.12.3, Scala, SparkSQL, eclipse-jee-oxygen-2-linux-gtk-x86_64

I have a csv file having 3 columns with data-type :String,Long,Date. I have converted csv file to datafram and want to show it. But it is giving following error

java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.spark.examples.sql.SparkSQLExample5$.$anonfun$runInferSchemaExample$2(SparkSQLExample5.scala:30)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at scala code

.map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();

Error comes, if subsequent rows have less values than number of values present in header. Basically I am trying to read data from csv using Scala and Spark with columns have null values.

Rows dont have the same number of columns. It is running successfully if all the rows have 3 column values.

package org.apache.spark.examples.sql

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.sql.Date
import org.apache.spark.sql.functions._
import java.util.Calendar;

object SparkSQLExample5 {

 case class Person(name: String, age: String, birthDate: String)

 def main(args: Array[String]): Unit = {
val fromDateTime=java.time.LocalDateTime.now;
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
import spark.implicits._
runInferSchemaExample(spark);
spark.stop()
}

private def runInferSchemaExample(spark: SparkSession): Unit = {
import spark.implicits._
println("1. Creating an RDD of 'Person' object and converting into 'Dataframe' "+ 
    " 2. Registering the DataFrame as a temporary view.")
println("1. Third column of second row is not present.Last value of second row is comma.")
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/test.csv")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();
val finalOutput=peopleDF.select("name","age","birthDate")
finalOutput.show();
}

}

csv file

col1,col2,col3
row21,row22,
row31,row32,
2

2 Answers

1
votes

Try PERMISSIVE mode when reading csv file, it will add NULL for missing fields

val df = spark.sqlContext.read.format("csv").option("mode", "PERMISSIVE") .load("examples/src/main/resources/test.csv")

you can find more information https://docs.databricks.com/data/data-sources/read-csv.html

0
votes

Input: csv file

col1,col2,col3
row21,row22,
row31,row32,

Code:

import org.apache.spark.sql.SparkSession

object ReadCsvFile {

  case class Person(name: String, age: String, birthDate: String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
    readCsvFileAndInferCustomSchema(spark);
    spark.stop()
  }

  private def readCsvFileAndInferCustomSchema(spark: SparkSession): Unit = {
    val df = spark.read.csv("C:/Users/Ralimili/Desktop/data.csv")
    val rdd = df.rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
    val mapRdd = rdd.map(attributes => {
      Person(attributes.getString(0), attributes.getString(1),attributes.getString(2))
    })
    val finalDf = spark.createDataFrame(mapRdd)
    finalDf.show(false);
  }

}

output

+-----+-----+---------+
|name |age  |birthDate|
+-----+-----+---------+
|row21|row22|null     |
|row31|row32|null     |
+-----+-----+---------+

If you want to fill some values instead of null values use below code

 val customizedNullDf = finalDf.na.fill("No data")
 customizedNullDf.show(false);

output

+-----+-----+---------+
|name |age  |birthDate|
+-----+-----+---------+
|row21|row22|No data  |
|row31|row32|No data  |
+-----+-----+---------+