4
votes

Runtime: Spark 2.3.0, Scala 2.11 (Databricks 4.1 ML beta)

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

//kafka settings and df definition goes here

val query = df.writeStream.format("parquet")
.option("path", ...)
.option("checkpointLocation",...)
.trigger(continuous(30000))
.outputMode(OutputMode.Append)
.start

Throws error not found: value continuous

Other attempts that did not work:

.trigger(continuous = "30 seconds") //as per Databricks blog
// throws same error as above

.trigger(Trigger.Continuous("1 second")) //as per Spark docs
// throws java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)

References:

(Databricks Blog) https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html

(Spark guide) http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#continuous-processing

(Scaladoc) https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.streaming.package

4

4 Answers

5
votes

Spark 2.3.0 does not support parquet under continuous streams, you would have to use streams based on Kafka, console or memory.

To quote the continuous processing mode in structured streaming blog post:

You can set the optional Continuous Trigger in queries that satisfy the following conditions: Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console.

0
votes

As the following spark code, only the sinks who implements the StreamWriteSupport interface can use ContinuousTrigger.

    (sink, trigger) match {
      case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
        new StreamingQueryWrapper(new ContinuousExecution(
          sparkSession,
          userSpecifiedName.orNull,
          checkpointLocation,
          analyzedPlan,
          v2Sink,
          trigger,
          triggerClock,
          outputMode,
          extraOptions,
          deleteCheckpointOnStop))
      case _ =>
        new StreamingQueryWrapper(new MicroBatchExecution(
          sparkSession,
          userSpecifiedName.orNull,
          checkpointLocation,
          analyzedPlan,
          sink,
          trigger,
          triggerClock,
          outputMode,
          extraOptions,
          deleteCheckpointOnStop))

And only three sinks have implemented this interface, ConsoleSinkProvider, KafkaSourceProvider, MemorySinkV2.

0
votes

In Spark 3.0.1 the continuous processing mode is experimental and supported for special query types dependent on their Source and Sink.

According to the documentation on Continuous Processing the following queries are supported and it seems that writing parquet is not supported:

As of Spark 2.4, only the following type of queries are supported in the continuous processing mode.

Operations: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (select, map, flatMap, mapPartitions, etc.) and selections (where, filter, etc.).
   All SQL functions are supported except aggregation functions (since aggregations are not yet supported), current_timestamp() and current_date() (deterministic computations using time is challenging).
Sources:
   Kafka source: All options are supported.
   Rate source: Good for testing. Only options that are supported in the continuous mode are numPartitions and rowsPerSecond.
Sinks:
   Kafka sink: All options are supported.
   Memory sink: Good for debugging.
   Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger.
-1
votes

Try using trigger(Trigger.ProcessingTime("1 second"))

This will work, as I had the same issue and it got resolved using this method.