4
votes

I'm loading parquet files from Databricks to Spark:

val dataset = context.session.read().parquet(parquetPath)

Then I perform some transformations like this:

val df = dataset.withColumn(
            columnName, concat_ws("",
            col(data.columnName), lit(textToAppend)))

When I try to save it as JSON to Kafka (not back to parquet!):

df = df.select(
            lit("databricks").alias("source"),
            struct("*").alias("data"))

val server = "kafka.dev.server" // some url
df = dataset.selectExpr("to_json(struct(*)) AS value")
df.write()
        .format("kafka")
        .option("kafka.bootstrap.servers", server)
        .option("topic", topic)
        .save()

I get the following exception:

org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file dbfs:/mnt/warehouse/part-00001-tid-4198727867000085490-1e0230e7-7ebc-4e79-9985-0a131bdabee2-4-c000.snappy.parquet. Column: [item_group_id], Expected: StringType, Found: INT32
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:310)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:287)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at com.databricks.sql.io.parquet.NativeColumnReader.readBatch(NativeColumnReader.java:448)
    at com.databricks.sql.io.parquet.DatabricksVectorizedParquetRecordReader.nextBatch(DatabricksVectorizedParquetRecordReader.java:330)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:167)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:299)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:287)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

This only happens if I'm trying to read multiple partitions. For example in the /mnt/warehouse/ directory I have a lot of parquet files each representing data from a datestamp. If I read only one of them I don't get exceptions but if I read the whole directory this exception happens.

I get this when I do a transformation, like above where I change the data type of a column. How can I fix this? I'm not trying to write back to parquet but to transform all files from the same source schema to a new schema and write them to Kafka.

2
If you want to write to a kafka topic, a solution might be to use a producerRobert Reynolds

2 Answers

4
votes

There seems to be an issue with the parquet files. The item_group_id column in the files are not all of the same data type, some files have the column stored as String and others as Integer. From the source code of the exception SchemaColumnConvertNotSupportedException we see the description:

Exception thrown when the parquet reader find column type mismatches.

A simple way to replicate the problem can be found among the tests for Spark on github:

Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet(s"$path/parquet")
Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet(s"$path/parquet")

spark.read.parquet(s"$path/parquet").collect()

Of course, this will only happen when reading multiple files at once, or as in the test above where more data has been appended. If a single file is read then there will not be a mismatch issue between the datatypes of a column.


The easiest way to fix the problem would be to make sure that the column types of all files are correct while writing the files.

The alternative is to read all the parquet files separetly, change the schemas to match and then combine them with union. An easy way to do this is to adjust the schemas:

// Specify the files and read as separate dataframes
val files = Seq(...)
val dfs = files.map(file => spark.read.parquet(file))

// Specify the schema (here the schema of the first file is used)
val schema = dfs.head.schema

// Create new columns with the correct names and types
val newCols = schema.map(c => col(c.name).cast(c.dataType))

// Select the new columns and merge the dataframes
val df = dfs.map(_.select(newCols: _*)).reduce(_ union _)
0
votes

You can find the instruction on this link

It present you the differents ways to write data to a kafka topic.