1
votes

I'm trying to analyze data using Kinesis source in PySpark Structured Streaming on Databricks.

I created a Dataframe as shown below.

kinDF = spark.readStream.format("kinesis").("streamName", "test-stream-1").load()

Later I converted the data from base64 encoding as below.

df =  kinDF.withColumn("xml_data", expr("CAST(data as string)"))

Now, I need to extract few fields from df.xml_data column using xpath. Can you please suggest any possible solution?

If I create a dataframe directly for these xml files as xml_df = spark.read.format("xml").options(rowTag='Consumers').load("s3a://bkt/xmldata"), I'm able to query using xpath:

xml_df.select("Analytics.Amount1").show()

But, not sure how to do extract elements similarly on a Spark Streaming dataframe where data is in text format.

Are there any xml functions to convert text data using schema? I saw an example for json data using from_json.

Is it possible to use spark.read on a dataframe column?

I need to find aggregated "Amount1" for every 5 minutes window.

Thanks for your help

1

1 Answers

0
votes

You can use com.databricks.spark.xml.XmlReader to read xml data from column but requires an RDD, which means that you need to transform your df to RDD using df.rdd which may impact performance.

Below is untested code from spark java:

import com.databricks.spark.xml

xmlRdd = df = kinDF.select("xml_data").map(r -> r[0])
new XmlReader().xmlRdd(spark, xmlRdd)