I have an Azure Databricks script in Python that reads JSON messages from Event Hub using Structured Streaming, processes the messages and saves the results in Data Lake Store. The messages are sent to the Event Hub from an Azure Logic App that reads tweets from the Twitter API.
I am trying to deserialize the body of the Event Hub message in order the process its contents. The message body is first converted from binary to string value and then deserialized to a struct type using the from_json
function, as explained in this article: https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
Here is a code example (with confuscated parameters):
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType
EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'
event_hub_conf = {
'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
schema = StructType() \
.add('FetchTimestampUtc', DateType()) \
.add('Username', StringType()) \
.add('Name', StringType()) \
.add('TweetedBy', StringType()) \
.add('Location', StringType()) \
.add('TweetText', StringType())
stream_data_body = stream_data \
.select(stream_data.body) \
.select(from_json('body', schema).alias('body')) \
.select(to_json('body').alias('body'))
# This works (bare string value, no deserialization):
# stream_data_body = stream_data.select(stream_data.body)
stream_data_body \
.writeStream \
.outputMode('append') \
.format('json') \
.option('path', OUTPUT_DIR) \
.option('checkpointLocation', CHECKPOINT_DIR) \
.start() \
.awaitTermination()
Here I am not actually doing any processing yet, just a trivial deserialization/serialization.
The above script does produce output to Data Lake, but the result JSON objects are empty. Here is an example of the output:
{}
{}
{}
The commented code in the script does produce output, but this is just the string value since we did not include deserialization:
{"body":"{\"FetchTimestampUtc\": 2018-10-16T09:21:40.6173187Z, \"Username\": ... }}
I was wondering if the backslashes should be doubled, as in the example given in the link above? This might be doable with the options parameter of the from_json
function: "options to control parsing. accepts the same options as the json datasource." But I have not found documentation for the options format.
Any ideas why the deserialization/serialization is not working?