7
votes

I'm working with Spark's Structured Streaming (2.2.1), using Kafka to receive data from sensors every 60 seconds. I'm having troubles wrapping my head around how to package this Kafka Data to be able to process is correctly as it comes.

I need to be able to do some calculations as the data comes in with Kafka.

My issue is unpacking the JSON data which is coming from Kafka into datasets I can work with

Data

A simplified data looks something like this:

{
  id: 1,
  timestamp: "timestamp"
  pump: {
    current: 1.0,
    flow: 20.0
    torque: 5.0
  },
  reactors: [
    {
      id: 1,
      status: 200,
    },

    {
      id: 2,
      status: 300,
    }
  ],
  settings: {
    pumpTimer: 20.0,
    reactorStatusTimer: 200.0
  }
}

In order to be able to work with this is Spark, I've created some case class structures for each of these:

// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

And generating the schema using:

val rawDataSchema = Encoders.product[RawData].schema

Raw data to Spark Schema

Firstly I put the 'value' field from Kafka into my general schema:

val rawDataSet = df.select($"value" cast "string" as "json")
  .select(from_json($"json", rawDataSchema))
  .select("data.*").as[RawData]

Using this rawDataSet, I can package each of the individual objects into datasets.

val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
  .select("pumpData.*").as[Pump]

val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
  .select("settingsData.*").as[Settings]

And this gives me nice and clean datasets per JSON object.

Working with the data

Here are my issues, if I want to for example compare or calculate some values between the two datasets for Settings and Pump, JOIN is not working using Structured Streaming.

val joinedData = pump.join(settings)

Error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;

Is my approach for this all wrong? Or is there any recommendations for alternative ways to handle this?

Thanks

1

1 Answers

2
votes

I’ll answer my own question with my now working solution

Instead of making case classes for each of the objects within the JSON, I could connect these together as one case class with nested objects as such:

case class RawData(
  id: String, 
  timestamp: String, 
  pump: Pump, 
  reactors: Array[Reactor], 
  settings: Settings
)

case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

To make this into a usable Dataset, I could simply call

val rawDataset = df.select($"value" cast "string" as "json")
  .select(from_json($"json", Encoders.product[RawData].schema) as 'data)
  .select("data.*").as[RawData]
  .withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.

After having processed the JSON and put it into my define schema, I could select each specific sensor like this:

val tester = rawDataset.select($"pump.current", $”settings.pumpTimer”)

Thank you user6910411 for pointing me in the right direction