1
votes

I'm using spark 2.3.2 and running into an issue doing a union on 2 or more streaming sources from Kafka. Each of these are streaming sources from Kafka that I've already transformed and stored in Dataframes.

I'd ideally want to store the results of this UNIONed dataframe in parquet format in HDFS or potentially even back into Kafka. The ultimate goal is to store these merged events with as low a latency as possible.

val finalDF = flatDF1
      .union(flatDF2)
      .union(flatDF3)

val query = finalDF.writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", hdfsLocation)
      .option("checkpointLocation", checkpointLocation)
      .option("failOnDataLoss", false)
      .start()

    query.awaitTermination()

when doing a writeStream to console instead of parquet I'm getting the expected results, but the example above causes an assertion failure.

Caused by: java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)

here is the class and assertion that is failing:

case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {

assert(sources.size == offsets.size)

Is this because the checkpoint is only storing the offsets for one of the dataframes? Looking through the Spark Structured Streaming documentation it looked like it was possible to do joins/union of streaming sources in Spark 2.2 or >

1
why are you using checkpointing instead of manuel commit ?maxime G
@maximeG is it possible to do structured streaming without a checkpoint? How else can you maintain the state of what is already consumed from Kafka?Joe Shields
kafka itself have a topic for your consumer : __consumer_offsetsmaxime G
You should look at spark/kafka documentation for manual commit : spark.apache.org/docs/latest/… The benefit as compared to checkpoints is that Kafka is a durable store regardless of changes to your application code if you have some trouble with kafka, like you read same data twice, look at group.id and auto.offset.reset properties in your kafka consumermaxime G

1 Answers

1
votes

First, please define how your case class OffsetSeq is related to the code with the unions of the dataframes.

Next, checkpointing is a real issue when performing this union and then writing to Kafka with writestream. Separating into multiple writestreams - each with it's own checkpointing - confuses batch id's because of the union operating. Using the same writestream with union of dataframes fails with checkpointing since the checkpoint appears to seek all the models that generated the dataframes before the union and cannot distinguish what row/record came from what dataframe/model.

For writing to Kafka, from structured sql streaming unioned dataframes - best to use writestream with foreach and ForEachWriter including the Kafka Producer in the process method. No checkpointing is needed; the application the just uses temp checkpoint files which are set to be deleted when appropriate - set "forceDeleteTempCheckpointLocation" to true - in the session builder.

Anyway, I have just set up scala code to union an arbitrary number of streaming dataframes and then write to Kafka Producer. Appears to work well once all Kafka Producer code is placed in the ForEachWriter process method so that it can be serialized by Spark.

val output = dataFrameModelArray.reduce(_ union _)
val stream: StreamingQuery = output
  .writeStream.foreach(new ForeachWriter[Row] {

    def open(partitionId: Long, version: Long): Boolean = {
      true
    }

    def process(row: Row): Unit = {
      val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
      val record = new ProducerRecord[String, String](producerTopic, row.getString(0), row.getString(1))
      producer.send(record)
    }

    def close(errorOrNull: Throwable): Unit = {
    }
  }
).start()

Can add more logic in process method if needed.

Note prior to union, all dataframes to be unioned have been converted into key, value string columns. Value is a json string of the message data to be sent over the Kafka Producer. This is also very important to get write before the union is attempted.

svcModel.transform(query)
    .select($"key", $"uuid", $"currentTime", $"label", $"rawPrediction", $"prediction")
    .selectExpr("key", "to_json(struct(*)) AS value")
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

where svcModel is a dataframe in the dataFrameModelArray.