1
votes

I have a use case where I have to load millions of json formatted data into Apache Hive Tables. So my solution was simply , load them into dataframe and write them as Parquet files . Then I shall create an external table on them .

I am using Apache Spark 2.1.0 with scala 2.11.8.

It so happens all the messages follow a sort of flexible schema . For example , a column "amount" can have value - 1.0 or 1 .

Since I am transforming data from semi-structured format to structured format but my schema is slightly variable , I have compensated by thinking inferSchema option for datasources like json will help me .

spark.read.option("inferSchema","true").json(RDD[String])

When I have used inferSchema as true while reading json data ,

case 1 : for smaller data , all the parquet files have amount as double .

case 2 : For larger data , some parquet files have amount as double and others have int64 .

I tried to debug and found certain concepts like schema evolution and schema merging which went over my head leaving me with more doubts than answers.

My doubts/questions are

  1. When I try to infer schema , does it not enforce the inferred schema onto full dataset ?

  2. Since I cannot enforce any schema due to my contraints , I thought to cast the whole column to double datatype as it can have both integers and decimal numbers . Is there a simpler way ?

  3. My guess being ,Since the data is partitioned , the inferSchema works per partition and then it gives me a general schema but it does not do anything like enforcing schema or anything of such sort . Please correct me if I am wrong .

Note : The reason I am using inferSchema option is because the incoming data is too much flexible/variable to enforce a case class of my own though some of the columns are mandatory . If you have a simpler solution, please suggest .

1

1 Answers

0
votes

Infer schema really just processes all the rows to find the types. Once it does that, it then merges the results to find a schema common to the whole dataset.

For example, some of your fields may have values in some rows, but not on other rows. So the inferred schema for this field then becomes nullable.

To answer your question, it's fine to infer schema for your input. However, since you intend to use the output in Hive you should ensure all the output files have the same schema.

An easy way to do this is to use casting (as you suggest). I typically like to do a select at the final stage of my jobs and just list all the columns and types. I feel this makes the job more human-readable.

e.g.

df
.coalesce(numOutputFiles)
.select(
  $"col1"        .cast(IntegerType).as("col1"),
  $"col2"        .cast( StringType).as("col2"),
  $"someOtherCol".cast(IntegerType).as("col3")
)
.write.parquet(outPath)