Here is the spark-shell script which I am using to convert csv data into parquet:
import org.apache.spark.sql.types._;
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/uploads/01ff5191-27c4-42db-a8e0-0d6594de3a5d/Worker_Snapshot_50_100000.csv");
val schema= StructType(Array(
StructField("CF_EmplID", StringType,true),
StructField("Worker", StringType,true),
StructField("CF_Sup Org Level 1", StringType,true),
StructField("CF_Sup Org Level 2", StringType,true),
StructField("CF_Sup Org Level 3", StringType,true),
StructField("CF_Sup Org Level 4", StringType,true),
StructField("Business Process Transaction", StringType,true),
StructField("Record Date", StringType,true),
StructField("CF_Fiscal Period", StringType,true),
StructField("Business Process Type", StringType,true),
StructField("Business Process Reason", StringType,true),
StructField("Active Status", BooleanType,true),
StructField("Age Group", StringType,true),
StructField("Annual Base Pay", StringType,true),
StructField("Base Pay Segment", StringType,true),
StructField("Compa-Ratio", StringType,true),
StructField("Company", StringType,true),
StructField("Compensation Grade", BooleanType,true),
StructField("Contingent Worker Type", StringType,true),
StructField("Cost Center", StringType,true),
StructField("Current Rating", StringType,true),
StructField("Employee Type", StringType,true),
StructField("Ending Headcount", IntegerType,true),
StructField("Ethnicity", StringType,true),
StructField("Exempt", BooleanType,true),
StructField("FTE", StringType,true),
StructField("Gender", StringType,true),
StructField("Highest Degree", StringType,true),
StructField("Hire Count", IntegerType,true),
StructField("Hire Year Text", IntegerType,true),
StructField("Hiring Source", StringType,true),
StructField("Involuntary Termination", StringType,true),
StructField("Involuntary Termination Count", IntegerType,true),
StructField("Is Critical Job", BooleanType,true),
StructField("Is High Loss Impact Risk", BooleanType,true),
StructField("Is High Potential", BooleanType,true),
StructField("Is Manager", BooleanType,true),
StructField("Is Retention Risk", BooleanType,true),
StructField("Job Category", StringType,true),
StructField("Job Code", IntegerType,true),
StructField("Job Family", IntegerType,true),
StructField("Job Family Group", StringType,true),
StructField("Job Profile", StringType,true),
StructField("Length of Service in Years including Partial Year", StringType,true),
StructField("Location", StringType,true),
StructField("Location - Country", StringType,true),
StructField("Loss Impact", StringType,true),
StructField("Management Level", StringType,true),
StructField("Manager", StringType,true),
StructField("Manager Count", IntegerType,true)
));
val dataFrame = spark.createDataFrame(df.rdd, schema)
var newDf = dataFrame
for(col <- dataFrame.columns){
newDf = newDf.withColumnRenamed(col,col.replaceAll("\\s", "_"))
}
newDf.write.parquet("/output_dir/parquet")
Seems pretty straight forward so far, but I am running into this exception which seems to be about trying to parse non-int value into a int field.
Here is the exception I am getting:
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
... 8 more
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr22$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_9$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 20 more
Am I doing something wrong when applying the schema to the data frame? I tried using the "inferSchema" option in the "sqlContext.read.format", but that seems to be guessing the types incorrectly.
inferSchemato build that - Thiago Baldim