3
votes

I am using Spark 2.4 with Hive Warehouse Connector and Scala 2.11. The current Hive Warehouse Connector provided by Hortonworks is not compatible with Spark 2.4. So I compile my jar file from https://github.com/abh1sh2k/spark-llap/pull/1/files which makes it compatible with Spark 2.4.

My Spark application reads from Kafka input stream and writes to Hive table (ORC format) using Hive output stream provided by Hive Warehouse Connector.

Here's my Spark code (Scala):

package example

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.functions.from_confluent_avro

object NormalizedEventsToHive extends Logging {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("NormalizedEventsToHive")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val schema_registry_config = Map(
      "schema.registry.url"           -> "http://schema-registry:8081",
      "value.schema.naming.strategy"  -> "topic.name",
      "schema.registry.topic"         -> "events-v1",
      "value.schema.id"               -> "latest"
    )

    val input_stream_df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("startingOffsets", "earliest")
      .option("subscribe", "events-v1")
      .load()

    val data = input_stream_df
      .select(from_confluent_avro(col("value"), schema_registry_config) as 'data)
      .select("data.*")

    val output_stream_df = data.writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
      .option("database", "default")
      .option("table", "events")
      .option("checkpointLocation", "file:///checkpoint2")
      .option("metastoreUri", "thrift://hive-metastore:9083")
      .start()

    output_stream_df.awaitTermination()
  }
}

The input Kafka messages are AVRO encoded and Confluent Schema Registry is used for schema version control. za.co.absa.abris.avro.functions.from_confluent_avro is used to decode the AVRO encoded Kafka messages.

Here's the AVRO schema:

{
    "type": "record",
    "name": "events",
    "fields": [
        { "name": "id",                 "type": ["null", "string"], "default": null },
        .....
        { "name": "field_map",          "type": ["null", { "type": "map", "values": ["null", "string"] }], "default": null },
        { "name": "field_array",        "type": ["null", { "type": "array", "items": "string" }], "default": null },
        { "name": "field_array_of_map", "type": ["null", { "type": "array", "items": { "type": "map", "values": ["null", "string"] }}], "default": null }
    ]
}

The events Hive table (ORC format) is created as:

CREATE TABLE `events`(
  `id`                  string,
  ......
  `field_map`           map<string,string>,
  `field_array`         array<string>,
  `field_array_of_map`  array<map<string,string>>
)
CLUSTERED BY(id) INTO 9 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');

The fields with array<string>, map<string, string>, array<map<string, string>> types, they are saved wrongly in Hive table.

When SELECT query is issued in Beeline, they show:

field_map               {"org.apache.spark.sql.catalyst.expressions.UnsafeMapData@101c5674":null}
field_array             ["org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@6b5730c2"]
field_array_of_map      [{"org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@ca82f1a4":null}]

From https://github.com/hortonworks-spark/spark-llap, it mentions Array type is supported, although Map isn't. Any idea how to save Array correctly? Any workaround for Map type?

1
what is the version of hive that you are using? - lev
@lev HIve version used is 3.1.2 - Shuwn Yuan Tee
can you include the avro schema? @ShuwnYuanTee - hlagos
@hlagos Edited my question to include my AVRO schema, thanks. - Shuwn Yuan Tee

1 Answers

0
votes

Changes found on the HWC github repository pull request worked for me in Structured Streaming.

What i did:

  1. Cloned @massoudm branch
  2. In the project root directory I ran sbt assembly
  3. I used the new created hwc jar
  4. My code:
data
      .writeStream
      .queryName(config("stream.name") + "_query")
      .options(hiveConfig)
      .option("writer", "json")
      .format(HiveWarehouseSession.STREAM_TO_STREAM)
      .outputMode("append")
      .start()
  1. Most important are:
.option("writer", "json")
.format(HiveWarehouseSession.STREAM_TO_STREAM)

This is the link to the pull request.