3
votes

I'm using structured streaming from Spark 3.0.

What I'm trying to do is writing data to multiple sinks. I need to write some DataFrame in Kafka to use in another process, and also need to store same one in Cassandra for later use(some dashboard and etc).

For the targeting process, I wrote a code like below. I referred to the official doc from here.

 merged_stream.writeStream
      //.trigger(Trigger.ProcessingTime("3 seconds"))
      .foreachBatch((batchDF: DataFrame, batchId: Long) => {
        batchDF.persist()
        batchDF.write
          .format("kafka")
          .option("kafka.bootstrap.servers", brokers)
          .option("kafka.compression.type", sinkCompressionType)
          .option("topic", mergeTopic)
          .mode("append")
          .save()
        batchDF.write
          .format("org.apache.spark.sql.cassandra")
          .cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
          .mode("append")
          .save()
        batchDF.unpersist() //**this is the problem!!**//
      })
      .option("checkpointLocation", checkpointDir)
      .start()
      .awaitTermination()

However, whenever I write batchDF.unpersist() in the last part of foreachBatch, it occurs compile error:

[error]   (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
[error]   (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
[error]  cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame)
[error]       .foreachBatch({(batchDF: DataFrame, batchId: Long) => {
[error]        ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed

When I remove the batchDF.unpersist(), it works normally and I checked that the data goes well into both Kafka and Cassandra. However, obviously, it soon got out of memory error since the cached data remains in memory.

I also tried to use sparkSession.catalog.clearCache(), but it seems not working as I intended.

Why this error is happening since my code is exactly the same as a document? Also, how can I fix it?

Thanks in advance.

1
Guessing from an error message and several experiments, I confirmed that the code got compilation error when function in foreachBatch ends with batchDF.unpersist(). So I'm just using meaningless logic at the end of function to avoid compilation error (such as if(false) {}), but I still don't know why this is happening.. - suyeon lee
this is scala issue caused by the fact that the last line in the method is the return value of the method. so the compiled signature doesn't match the expected one. try to extract all the function code inside foreachBatch to a method which declares that it returns Unit, and it would solve your issue. - yishaiz

1 Answers

2
votes

Spark has been providing two different methods for both Scala and Java, because Scala doesn't generate Java lambda before Scala 2.12.

  /**
   * Applies a function `f` to all rows.
   *
   * @group action
   * @since 1.6.0
   */
  def foreach(f: T => Unit): Unit = withNewRDDExecutionId {
    rdd.foreach(f)
  }

  /**
   * (Java-specific)
   * Runs `func` on each element of this Dataset.
   *
   * @group action
   * @since 1.6.0
   */
  def foreach(func: ForeachFunction[T]): Unit = foreach(func.call(_))

That was for convenience to the Java users, but once Spark starts supporting Scala 2.12 these methods conflict each other.

There was relevant discussion in Spark community but the decision looked to be made to keep API compatibility. That said, unfortunately, you need to "strictly" match one of the signature between two methods, for example, add Unit at the end of lambda.