1
votes

I'm trying to create a dataframe in Spark by reading a csv, the problem is that if I don't do anything in particular the dataframe will have every column type as string:

root
 |-- ticker: string (nullable = true)
 |-- open: string (nullable = true)
 |-- close: string (nullable = true)
 |-- adj_close: string (nullable = true)
 |-- low: string (nullable = true)
 |-- high: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- date: string (nullable = true)

In order to solve this I add the option "inferSchema" as true, like this:

val spark = SparkSession.builder
.appName("Job One")
.master("local")
.config("spark.eventLog.enabled", "true")
.config("spark.eventLog.dir", spark_events)
.getOrCreate()
import spark.implicits._

val df = spark.read
     .format("csv")
     .option("inferSchema", "true")
     .option("header", "true") 
     .option("mode", "DROPMALFORMED")
     .load(historicalStockPrices)

df.printSchema()

And this way I obtain this instead:

root
 |-- ticker: string (nullable = true)
 |-- open: double (nullable = true)
 |-- close: double (nullable = true)
 |-- adj_close: double (nullable = true)
 |-- low: double (nullable = true)
 |-- high: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- date: string (nullable = true)

Which is what I want, but adding the option inferSchema makes so that the job takes 1.4 minutes instead of just 6 seconds when I don't add it. Another method to obtain the columns with the types I want is by using withColumn, like this:

val df2 = df
.withColumn("open",df("open").cast("Float"))
.withColumn("close",df("close").cast("Float"))
.withColumn("adj_close",df("adj_close").cast("Float"))
.withColumn("low",df("low").cast("Float"))
.withColumn("high",df("high").cast("Float"))
.withColumn("volume",df("volume").cast("Long"))

df2.printSchema()

The result of the whole operation this time is just 6 seconds again. What gives?

2

2 Answers

2
votes

The answer to your question: When you specify .option("inferSchema", "true") it needs to dynamically read the whole file as you do not specify a percentage. That takes a while. This is not done with big files often.

1
votes

Maybe this can help. Referring this, why don't you try creating your own schema of StructType and then right before load, you could use the schema method. So while reading the CSV, your code will look like:

//Assuming you've already created your schema

val df = spark.read
   .format("csv")
   .option("header", "true")
   .schema(customSchema) 
   .option("mode", "DROPMALFORMED")
   .load(historicalStockPrices)