5
votes

I have an Azure Databricks script in Python that reads JSON messages from Event Hub using Structured Streaming, processes the messages and saves the results in Data Lake Store. The messages are sent to the Event Hub from an Azure Logic App that reads tweets from the Twitter API.

I am trying to deserialize the body of the Event Hub message in order the process its contents. The message body is first converted from binary to string value and then deserialized to a struct type using the from_json function, as explained in this article: https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

Here is a code example (with confuscated parameters):

from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType

EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'

event_hub_conf = {
    'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}

stream_data = spark \
    .readStream \
    .format('eventhubs') \
    .options(**event_hub_conf) \
    .option('multiLine', True) \
    .option('mode', 'PERMISSIVE') \
    .load()

schema = StructType() \
    .add('FetchTimestampUtc', DateType()) \
    .add('Username', StringType()) \
    .add('Name', StringType()) \
    .add('TweetedBy', StringType()) \
    .add('Location', StringType()) \
    .add('TweetText', StringType())

stream_data_body = stream_data \
    .select(stream_data.body) \
    .select(from_json('body', schema).alias('body')) \
    .select(to_json('body').alias('body'))

# This works (bare string value, no deserialization):
# stream_data_body = stream_data.select(stream_data.body)

stream_data_body \
    .writeStream \
    .outputMode('append') \
    .format('json') \
    .option('path', OUTPUT_DIR) \
    .option('checkpointLocation', CHECKPOINT_DIR) \
    .start() \
    .awaitTermination()

Here I am not actually doing any processing yet, just a trivial deserialization/serialization.

The above script does produce output to Data Lake, but the result JSON objects are empty. Here is an example of the output:

{}
{}
{}

The commented code in the script does produce output, but this is just the string value since we did not include deserialization:

{"body":"{\"FetchTimestampUtc\": 2018-10-16T09:21:40.6173187Z, \"Username\": ... }}

I was wondering if the backslashes should be doubled, as in the example given in the link above? This might be doable with the options parameter of the from_json function: "options to control parsing. accepts the same options as the json datasource." But I have not found documentation for the options format.

Any ideas why the deserialization/serialization is not working?

1
The json section here spark.apache.org/docs/2.2.1/api/java/org/apache/spark/sql/… contains all the options you can apply to JSON readers in Spark. One intriguing one is allowBackslashEscapingAnyCharacter but I'll leave it up to you to tell us which one works for your case ... - Kyle Hale
PS options are passed as JSON too e.g. {'allowUnquotedFieldNames':'true'} - Kyle Hale
Thanks for the link @KyleHale! I tried with 'allowUnquotedFieldNames':'true' but it did not solve the issue. I am pretty sure the issue is related to the schema definition, and I have also been in contact with the Databricks team about this, but I still haven't been able to pin down what the problem is exactly, since there is no error message. :/ - Lazer

1 Answers

3
votes

It appears that the input JSON must have a specific syntax. The field values must be strings, timestamps are not allowed (and perhaps the same goes for integers, floats etc.). The type conversion must be done inside the Databricks script.

I changed the input JSON so that the timestamp value is quoted. In the schema, I also changed DateType to TimestampType (which is more appropriate), NOT to StringType.

By using the following select expression:

stream_data_body = stream_data \
    .select(from_json(stream_data.body.cast('string'), schema).alias('body')) \
    .select(to_json('body').alias('body'))

the following output is produced in the output file:

{"body":"{\"FetchTimestampUtc\":\"2018-11-29T21:26:40.039Z\",\"Username\":\"xyz\",\"Name\":\"x\",\"TweetedBy\":\"xyz\",\"Location\":\"\",\"TweetText\":\"RT @z123: I just want to say thanks to everyone who interacts with me, whether they talk or they just silently rt or like, thats okay.…\"}"}

which is kind of the expected result, although the timestamp value is outputted as a string value. In fact, the whole body object is outputted as a string.

I didn't manage to get the ingestion working if the input format is proper JSON with native field types. The output of from_json is always null in that case.

EDIT: This seems to have been confusion on my part. Date values should always be quoted in JSON, they are not "native" types.

I have tested that integer and float values can be passed without quotes so that it is possible to do calculations with them.