I am using Spark 2.4 with Hive Warehouse Connector and Scala 2.11. The current Hive Warehouse Connector provided by Hortonworks is not compatible with Spark 2.4. So I compile my jar file from https://github.com/abh1sh2k/spark-llap/pull/1/files which makes it compatible with Spark 2.4.
My Spark application reads from Kafka input stream and writes to Hive table (ORC format) using Hive output stream provided by Hive Warehouse Connector.
Here's my Spark code (Scala):
package example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.functions.from_confluent_avro
object NormalizedEventsToHive extends Logging {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("NormalizedEventsToHive")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schema_registry_config = Map(
"schema.registry.url" -> "http://schema-registry:8081",
"value.schema.naming.strategy" -> "topic.name",
"schema.registry.topic" -> "events-v1",
"value.schema.id" -> "latest"
)
val input_stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "events-v1")
.load()
val data = input_stream_df
.select(from_confluent_avro(col("value"), schema_registry_config) as 'data)
.select("data.*")
val output_stream_df = data.writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
.option("database", "default")
.option("table", "events")
.option("checkpointLocation", "file:///checkpoint2")
.option("metastoreUri", "thrift://hive-metastore:9083")
.start()
output_stream_df.awaitTermination()
}
}
The input Kafka messages are AVRO encoded and Confluent Schema Registry is used for schema version control. za.co.absa.abris.avro.functions.from_confluent_avro is used to decode the AVRO encoded Kafka messages.
Here's the AVRO schema:
{
"type": "record",
"name": "events",
"fields": [
{ "name": "id", "type": ["null", "string"], "default": null },
.....
{ "name": "field_map", "type": ["null", { "type": "map", "values": ["null", "string"] }], "default": null },
{ "name": "field_array", "type": ["null", { "type": "array", "items": "string" }], "default": null },
{ "name": "field_array_of_map", "type": ["null", { "type": "array", "items": { "type": "map", "values": ["null", "string"] }}], "default": null }
]
}
The events Hive table (ORC format) is created as:
CREATE TABLE `events`(
`id` string,
......
`field_map` map<string,string>,
`field_array` array<string>,
`field_array_of_map` array<map<string,string>>
)
CLUSTERED BY(id) INTO 9 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
The fields with array<string>, map<string, string>, array<map<string, string>> types, they are saved wrongly in Hive table.
When SELECT query is issued in Beeline, they show:
field_map {"org.apache.spark.sql.catalyst.expressions.UnsafeMapData@101c5674":null}
field_array ["org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@6b5730c2"]
field_array_of_map [{"org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@ca82f1a4":null}]
From https://github.com/hortonworks-spark/spark-llap, it mentions Array type is supported, although Map isn't. Any idea how to save Array correctly? Any workaround for Map type?
3.1.2- Shuwn Yuan Tee