I am trying to parse Azure Event Hub messages generated from Azure blob file events using spark streaming and scala.
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object eventhub {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Event Hub")
//.config("spark.some.config.option", "some-value")
.master("local")
.getOrCreate()
import spark.implicits._
// Event hub configurations
// Replace values below with yours
val eventHubName = "xxx"
val eventHubNSConnStr = "Endpoint=xxxxx"
val connStr = ConnectionStringBuilder(eventHubNSConnStr).setEventHubName(eventHubName).build
val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5)
val incomingStream = spark.readStream.format("eventhubs")
.options(customEventhubParameters.toMap).load()
incomingStream.printSchema
val testSchema = new StructType()
//.add("offset", StringType)
//.add("Time", StringType)
//.add("Timestamp", LongType)
.add ("Body", new ArrayType( new StructType()
.add("topic", StringType)
.add("subject", StringType)
.add("eventType", StringType)
.add("eventTime", StringType)
.add("id", StringType)
.add("data", new StructType()
.add("api", StringType)
.add("clientRequestId", StringType)
.add("requestId", StringType)
.add("eTag", StringType)
.add("contentType", StringType)
.add("contentLength", LongType)
.add("blobType", StringType)
.add("url", StringType)
.add("sequencer", StringType)
.add("storageDiagnostics", new StructType()
.add("batchId", StringType)))
.add("dataVersion", StringType)
.add("metadataVersion", StringType), false))
// Event Hub message format is JSON and contains "body" field
// Body is binary, so you cast it to string to see the actual content of the message
val messages = incomingStream.select($"body".cast(StringType)).alias("body")
//.select(explode($"body")).alias("newbody")
.select(from_json($"body",testSchema)).alias("newbody")
.select("newbody.*")
/*
Output of val messages = incomingStream.select($"body".cast(StringType)).alias("body")
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|body |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"topic":"A1","subject":"A2","eventType":"A3","eventTime":"2019-07-26T17:00:32.4820786Z","id":"1","data":{"api":"PutBlob","clientRequestId":"A4","requestId":"A5","eTag":"A6","contentType":"A7","contentLength":496,"blobType":"BlockBlob","url":"https://test.blob.core.windows.net/test/20190726125719.csv","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1"}]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*/
messages.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination()
}
}
Here are structures of the original incoming stream and the "body"
root
|-- body: binary (nullable = true)
|-- partition: string (nullable = true)
|-- offset: string (nullable = true)
|-- sequenceNumber: long (nullable = true)
|-- enqueuedTime: timestamp (nullable = true)
|-- publisher: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- systemProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
root
|-- body: string (nullable = true)
Looking at the output of "body", it feels like an array and seem like it need to be exploded but "body" data type is coming out to be string and it is complaining about using "explode" function.
It is not correctly parsing at this time when I am passing the schema as it is string and I am not sure what exactly should be the structure and how I get JSON structure to be parsed. Currently I get NULL output as it is obviously failing on the JSON parsing. Any inputs are appreciated. Thank you for your help.