I'm using spark 2.3.2 and running into an issue doing a union on 2 or more streaming sources from Kafka. Each of these are streaming sources from Kafka that I've already transformed and stored in Dataframes.
I'd ideally want to store the results of this UNIONed dataframe in parquet format in HDFS or potentially even back into Kafka. The ultimate goal is to store these merged events with as low a latency as possible.
val finalDF = flatDF1
.union(flatDF2)
.union(flatDF3)
val query = finalDF.writeStream
.format("parquet")
.outputMode("append")
.option("path", hdfsLocation)
.option("checkpointLocation", checkpointLocation)
.option("failOnDataLoss", false)
.start()
query.awaitTermination()
when doing a writeStream to console instead of parquet I'm getting the expected results, but the example above causes an assertion failure.
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
here is the class and assertion that is failing:
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {
assert(sources.size == offsets.size)
Is this because the checkpoint is only storing the offsets for one of the dataframes? Looking through the Spark Structured Streaming documentation it looked like it was possible to do joins/union of streaming sources in Spark 2.2 or >