I'm working with Spark's Structured Streaming (2.2.1), using Kafka to receive data from sensors every 60 seconds. I'm having troubles wrapping my head around how to package this Kafka Data to be able to process is correctly as it comes.
I need to be able to do some calculations as the data comes in with Kafka.
My issue is unpacking the JSON data which is coming from Kafka into datasets I can work with
Data
A simplified data looks something like this:
{
id: 1,
timestamp: "timestamp"
pump: {
current: 1.0,
flow: 20.0
torque: 5.0
},
reactors: [
{
id: 1,
status: 200,
},
{
id: 2,
status: 300,
}
],
settings: {
pumpTimer: 20.0,
reactorStatusTimer: 200.0
}
}
In order to be able to work with this is Spark, I've created some case class structures for each of these:
// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)
And generating the schema using:
val rawDataSchema = Encoders.product[RawData].schema
Raw data to Spark Schema
Firstly I put the 'value' field from Kafka into my general schema:
val rawDataSet = df.select($"value" cast "string" as "json")
.select(from_json($"json", rawDataSchema))
.select("data.*").as[RawData]
Using this rawDataSet, I can package each of the individual objects into datasets.
val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
.select("pumpData.*").as[Pump]
val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
.select("settingsData.*").as[Settings]
And this gives me nice and clean datasets per JSON object.
Working with the data
Here are my issues, if I want to for example compare or calculate some values between the two datasets for Settings and Pump, JOIN is not working using Structured Streaming.
val joinedData = pump.join(settings)
Error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;
Is my approach for this all wrong? Or is there any recommendations for alternative ways to handle this?
Thanks