1
votes

I am trying to parse Azure Event Hub messages generated from Azure blob file events using spark streaming and scala.

import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


object eventhub {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .appName("Event Hub")
  //.config("spark.some.config.option", "some-value")
  .master("local")
  .getOrCreate()
import spark.implicits._


// Event hub configurations
// Replace values below with yours
val eventHubName = "xxx"
val eventHubNSConnStr = "Endpoint=xxxxx"
val connStr = ConnectionStringBuilder(eventHubNSConnStr).setEventHubName(eventHubName).build

val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5)
val incomingStream = spark.readStream.format("eventhubs")
                    .options(customEventhubParameters.toMap).load()
incomingStream.printSchema



val testSchema = new StructType()
  //.add("offset", StringType)
  //.add("Time", StringType)
  //.add("Timestamp", LongType)
  .add ("Body", new ArrayType( new StructType()
  .add("topic", StringType)
  .add("subject", StringType)
  .add("eventType", StringType)
  .add("eventTime", StringType)
  .add("id", StringType)
  .add("data", new StructType()
    .add("api", StringType)
    .add("clientRequestId", StringType)
    .add("requestId", StringType)
    .add("eTag", StringType)
    .add("contentType", StringType)
    .add("contentLength", LongType)
    .add("blobType", StringType)
    .add("url", StringType)
    .add("sequencer", StringType)
    .add("storageDiagnostics", new StructType()
      .add("batchId", StringType)))
  .add("dataVersion", StringType)
  .add("metadataVersion", StringType), false))



 // Event Hub message format is JSON and contains "body" field
 // Body is binary, so you cast it to string to see the actual content of the message

val messages = incomingStream.select($"body".cast(StringType)).alias("body")
  //.select(explode($"body")).alias("newbody")
  .select(from_json($"body",testSchema)).alias("newbody")
    .select("newbody.*")

/*
Output of val messages = incomingStream.select($"body".cast(StringType)).alias("body")

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|body                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"topic":"A1","subject":"A2","eventType":"A3","eventTime":"2019-07-26T17:00:32.4820786Z","id":"1","data":{"api":"PutBlob","clientRequestId":"A4","requestId":"A5","eTag":"A6","contentType":"A7","contentLength":496,"blobType":"BlockBlob","url":"https://test.blob.core.windows.net/test/20190726125719.csv","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1"}]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

*/
    messages.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()

  }

}

Here are structures of the original incoming stream and the "body"

root
 |-- body: binary (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- sequenceNumber: long (nullable = true)
 |-- enqueuedTime: timestamp (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- systemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

root
 |-- body: string (nullable = true)

Looking at the output of "body", it feels like an array and seem like it need to be exploded but "body" data type is coming out to be string and it is complaining about using "explode" function.

It is not correctly parsing at this time when I am passing the schema as it is string and I am not sure what exactly should be the structure and how I get JSON structure to be parsed. Currently I get NULL output as it is obviously failing on the JSON parsing. Any inputs are appreciated. Thank you for your help.

1

1 Answers

0
votes

Based on the output of body printed above, it appears there is no element with name Body, which is the reason why it is returning null, Please use the modified schema definition below, it should help.

val testSchema = new StructType()
      .add("topic", StringType)
      .add("subject", StringType)
      .add("eventType", StringType)
      .add("eventTime", StringType)
      .add("id", StringType)
      .add("data", new StructType()
        .add("api", StringType)
        .add("clientRequestId", StringType)
        .add("requestId", StringType)
        .add("eTag", StringType)
        .add("contentType", StringType)
        .add("contentLength", LongType)
        .add("blobType", StringType)
        .add("url", StringType)
        .add("sequencer", StringType)
        .add("storageDiagnostics", new StructType()
          .add("batchId", StringType)))
      .add("dataVersion", StringType)
      .add("metadataVersion", StringType)

If your input payload contains more than one object in the array, then from_json with the above schema will return null. If you expect more than one object in your array then below schema should help.

 val testSchema = new ArrayType(new StructType()
  .add("topic", StringType)
  .add("subject", StringType)
  .add("eventType", StringType)
  .add("eventTime", StringType)
  .add("id", StringType)
  .add("data", new StructType()
    .add("api", StringType)
    .add("clientRequestId", StringType)
    .add("requestId", StringType)
    .add("eTag", StringType)
    .add("contentType", StringType)
    .add("contentLength", LongType)
    .add("blobType", StringType)
    .add("url", StringType)
    .add("sequencer", StringType)
    .add("storageDiagnostics", new StructType()
      .add("batchId", StringType)))
  .add("dataVersion", StringType)
  .add("metadataVersion", StringType),false)