I'm using structured streaming from Spark 3.0.
What I'm trying to do is writing data to multiple sinks. I need to write some DataFrame in Kafka to use in another process, and also need to store same one in Cassandra for later use(some dashboard and etc).
For the targeting process, I wrote a code like below. I referred to the official doc from here.
merged_stream.writeStream
//.trigger(Trigger.ProcessingTime("3 seconds"))
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF.persist()
batchDF.write
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("kafka.compression.type", sinkCompressionType)
.option("topic", mergeTopic)
.mode("append")
.save()
batchDF.write
.format("org.apache.spark.sql.cassandra")
.cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
.mode("append")
.save()
batchDF.unpersist() //**this is the problem!!**//
})
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
However, whenever I write batchDF.unpersist() in the last part of foreachBatch, it occurs compile error:
[error] (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
[error] (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
[error] cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame)
[error] .foreachBatch({(batchDF: DataFrame, batchId: Long) => {
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
When I remove the batchDF.unpersist(), it works normally and I checked that the data goes well into both Kafka and Cassandra. However, obviously, it soon got out of memory error since the cached data remains in memory.
I also tried to use sparkSession.catalog.clearCache(), but it seems not working as I intended.
Why this error is happening since my code is exactly the same as a document? Also, how can I fix it?
Thanks in advance.
foreachBatchends withbatchDF.unpersist(). So I'm just using meaningless logic at the end of function to avoid compilation error (such asif(false) {}), but I still don't know why this is happening.. - suyeon leeforeachBatchto a method which declares that it returnsUnit, and it would solve your issue. - yishaiz