1
votes

I am using Kafka and Spark Structured Streaming. I am receiving kafka messages in following format.

{"deviceId":"001","sNo":1,"data":"aaaaa"}
{"deviceId":"002","sNo":1,"data":"bbbbb"}
{"deviceId":"001","sNo":2,"data":"ccccc"}
{"deviceId":"002","sNo":2,"data":"ddddd"}

I am reading it like below.

Dataset<String> data = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as(Encoders.STRING());
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
      processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes())
);}

private void processData(String deviceId,int SNo, byte[] data) 
{
  //How to check previous processed Dataset???
} 

In my json message "data" is String form of byte[]. I have a requirement where I need to process the binary "data" for given "deviceId" in order of "sNo". So for "deviceId"="001", I have to process the binary data for "sNo"=1 and then "sNo"=2 and so on. How can I check state of previous processed Dataset in Structured Streaming?

1
What did you try so far? - Jan
I have updated my code. Please check. I am doing orderBy and then forEach to process data. I am stuck at processData method how to handle current and previous data from Dataset received by streaming. - user7615505

1 Answers

1
votes

If you are looking for state management like DStream.mapWithState then it is not supported yet in Structured Streaming. Work is in progress. Please Check https://issues.apache.org/jira/browse/SPARK-19067.