I'm loading parquet files from Databricks to Spark:
val dataset = context.session.read().parquet(parquetPath)
Then I perform some transformations like this:
val df = dataset.withColumn(
columnName, concat_ws("",
col(data.columnName), lit(textToAppend)))
When I try to save it as JSON to Kafka (not back to parquet!):
df = df.select(
lit("databricks").alias("source"),
struct("*").alias("data"))
val server = "kafka.dev.server" // some url
df = dataset.selectExpr("to_json(struct(*)) AS value")
df.write()
.format("kafka")
.option("kafka.bootstrap.servers", server)
.option("topic", topic)
.save()
I get the following exception:
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file dbfs:/mnt/warehouse/part-00001-tid-4198727867000085490-1e0230e7-7ebc-4e79-9985-0a131bdabee2-4-c000.snappy.parquet. Column: [item_group_id], Expected: StringType, Found: INT32
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:310)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:287)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
at com.databricks.sql.io.parquet.NativeColumnReader.readBatch(NativeColumnReader.java:448)
at com.databricks.sql.io.parquet.DatabricksVectorizedParquetRecordReader.nextBatch(DatabricksVectorizedParquetRecordReader.java:330)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:167)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:299)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:287)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
This only happens if I'm trying to read multiple partitions. For example in the /mnt/warehouse/
directory I have a lot of parquet files each representing data from a datestamp
. If I read only one of them I don't get exceptions but if I read the whole directory this exception happens.
I get this when I do a transformation, like above where I change the data type of a column. How can I fix this? I'm not trying to write back to parquet but to transform all files from the same source schema to a new schema and write them to Kafka.