0
votes

Here is the spark-shell script which I am using to convert csv data into parquet:

import org.apache.spark.sql.types._;
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/uploads/01ff5191-27c4-42db-a8e0-0d6594de3a5d/Worker_Snapshot_50_100000.csv");
val schema= StructType(Array(
    StructField("CF_EmplID",        StringType,true),
    StructField("Worker",        StringType,true),
    StructField("CF_Sup Org Level 1",          StringType,true),
    StructField("CF_Sup Org Level 2",            StringType,true),
    StructField("CF_Sup Org Level 3",             StringType,true),
    StructField("CF_Sup Org Level 4",         StringType,true),
    StructField("Business Process Transaction",    StringType,true),
    StructField("Record Date",            StringType,true),
    StructField("CF_Fiscal Period",                   StringType,true),
    StructField("Business Process Type",                   StringType,true),
    StructField("Business Process Reason",                   StringType,true),
    StructField("Active Status",                   BooleanType,true),
    StructField("Age Group",                   StringType,true),
    StructField("Annual Base Pay",                   StringType,true),
    StructField("Base Pay Segment",                   StringType,true),
    StructField("Compa-Ratio",                   StringType,true),
    StructField("Company",                   StringType,true),
    StructField("Compensation Grade",                   BooleanType,true),
    StructField("Contingent Worker Type",                   StringType,true),
    StructField("Cost Center",                   StringType,true),
    StructField("Current Rating",                   StringType,true),
    StructField("Employee Type",                   StringType,true),
    StructField("Ending Headcount",                   IntegerType,true),
    StructField("Ethnicity",                   StringType,true),
    StructField("Exempt",                   BooleanType,true),
    StructField("FTE",                   StringType,true),
    StructField("Gender",                   StringType,true),
    StructField("Highest Degree",                   StringType,true),
    StructField("Hire Count",                   IntegerType,true),
    StructField("Hire Year Text",                   IntegerType,true),
    StructField("Hiring Source",                   StringType,true),
    StructField("Involuntary Termination",                   StringType,true),
    StructField("Involuntary Termination Count",                   IntegerType,true),
    StructField("Is Critical Job",                   BooleanType,true),
    StructField("Is High Loss Impact Risk",                   BooleanType,true),
    StructField("Is High Potential",                   BooleanType,true),
    StructField("Is Manager",                   BooleanType,true),
    StructField("Is Retention Risk",                   BooleanType,true),
    StructField("Job Category",                   StringType,true),
    StructField("Job Code",                   IntegerType,true),
    StructField("Job Family",                   IntegerType,true),
    StructField("Job Family Group",                   StringType,true),
    StructField("Job Profile",                   StringType,true),
    StructField("Length of Service in Years including Partial Year",                   StringType,true),
    StructField("Location",                   StringType,true),
    StructField("Location - Country",                   StringType,true),
    StructField("Loss Impact",                   StringType,true),
    StructField("Management Level",                   StringType,true),
    StructField("Manager",                   StringType,true),
    StructField("Manager Count",                   IntegerType,true)
    ));


 val dataFrame = spark.createDataFrame(df.rdd, schema)

 var newDf = dataFrame
 for(col <- dataFrame.columns){
    newDf = newDf.withColumnRenamed(col,col.replaceAll("\\s", "_"))
  }

  newDf.write.parquet("/output_dir/parquet")

Seems pretty straight forward so far, but I am running into this exception which seems to be about trying to parse non-int value into a int field.

Here is the exception I am getting:

  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
  at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
  at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
  ... 8 more
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr22$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_9$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
  ... 20 more

Am I doing something wrong when applying the schema to the data frame? I tried using the "inferSchema" option in the "sqlContext.read.format", but that seems to be guessing the types incorrectly.

2
I really suggest you to try to use the inferSchema to build that - Thiago Baldim

2 Answers

1
votes

Instead of

val dataFrame = spark.createDataFrame(df.rdd, schema)

use:

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .schema(schema)
  .load(...);
0
votes

Try comparing schema of df and custom schema , convert the datatype of the columns as it matches both schema column-data types.