14
votes

I have Spark Structured Streaming Job that reads from S3, transforms the data and then store it to one S3 sink and one Elasticsearch sink.

Currently, I am doing readStream once and then writeStream.format("").start() twice. When doing so it seems that Spark read the data twice from S3 source, once per each sink.

Is there a more efficient way to write to multiple sinks in the same pipeline?

4

4 Answers

13
votes

Currently, I am doing readStream once and then twice writeStream.format("").start().

You actually create two separate streaming queries. The load-part is to describe the first (and only) streaming source. That does nothing execution-wise.

When doing so it seems that Spark read the data twice from S3 source, per each sink.

That's the most correct way to describe how Spark Structured Streaming's queries work. The number of sinks correspond to the number of queries because one streaming query can have exactly one streaming sink (see StreamExecution that sits behind any streaming query).

You can also check the number of threads (using jconsole or similar) as Structured Streaming uses one microBatchThread thread per streaming query (see StreamExecution).

Is there a more efficient way to write to multiple sinks in the same pipeline?

It is not possible in the current design of Spark Structured Streaming.

5
votes

What you want to do is cache() the data after reading once and use the data multiple times. I do not believe Spark Structured Streaming currently supports caching (see here), however you can use Spark Streaming. It's a lower level API compared to Structured Streaming (using an underlying RDD as compared to Dataframe/Dataset). From the Spark Streaming documentation:

Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That is, using the persist() method on a DStream will automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data).

Using the Spark Streaming API you can use Dstream.cache() on the data. This marks the underlying RDDs as cached which should prevent a second read. Spark Streaming will unpersist the RDDs automatically after a timeout, you can control the behavior with the spark.cleaner.ttl setting. Note that the default value is infinite which I would not recommend in a production setting.

Instead of using Dstream.cache() where you need to wait for the spark.cleaner.ttl timeout, there is another way to cache the data. It's possible to use foreachRDD to directly access the underlying RDDs. Here the RDDs can be uncached directly after usage.

dstream.foreachRDD{rdd =>
  rdd.cache()
  // perform any transormations, etc. 
  rdd.saveAs(...)
  rdd.unpersist(true)
}  
3
votes

I am also looking for solution of this problem . I want to write some records of dataframe in sink1 while other records in sink2 (depending upon some condition, without reading the same data twice in 2 streaming queries). Currently it does not seem possible as per current implementation ( createSink() method in DataSource.scala provides support for a single sink).

However, In Spark 2.4.0 there is a new api coming: foreachBatch() which will give handle to a dataframe microbatch which can be used to cache the dataframe, write to different sinks or processing multiple times before uncaching aagin. Something like this:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.cache()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.uncache()
}

right now this feature available in databricks runtime : https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch

0
votes

This worked for me. Below code is written in Scala V2.13.3.

package com.spark.structured.stream.multisink

import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
import org.apache.spark.sql.types.StructType

object MultipleStreamingSink extends App {

val spark = SparkSession
.builder()
.master("local[*]")
.getOrCreate()

import spark.implicits._

val csvSchema = new StructType()
.add("name", "string").add("age", "integer").add("num","integer").add("date", "string")

val sample = spark.readStream
.schema(csvSchema)
.format("csv")
.options(Map("inferSchema" ->"true", "delimiter"->",", "header"->"true"))
.load("path/to/input/dir")


val sample1 = sample.withColumn("datetime",col("date").cast(TimestampType)).drop("date")

val sampleAgg1 = sample1.withWatermark("datetime", "10 minutes")
.groupBy(window($"datetime", "5 minutes", "5 minutes"), col("name"))
.agg(count("age").alias("age_count"))

val sampleAgg2 = sample1.withWatermark("datetime", "10 minutes")
.groupBy(window($"datetime", "5 minutes", "5 minutes"), col("age"))
.agg(count("name").alias("name_count"))


// I have used console to stream the output, use your sinks accordingly 
val sink1 = sampleAgg1
.withColumn("window_start_time", col("window.start"))
.withColumn("window_end_time", col("window.end"))
.drop("window")
.writeStream
.queryName("count by name")
.option("checkpointLocation", "/tmp/1")
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("60 seconds"))
.format("console")
.option("numRows", 100)
.option("truncate", false)
.start()

val sink2 = sampleAgg2
.withColumn("window_start_time", col("window.start"))
.withColumn("window_end_time", col("window.end"))
.drop("window")
.writeStream
.option("checkpointLocation", "/tmp/2")
.queryName("count by age")
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("60 seconds"))
.format("console")
.option("numRows", 100)
.option("truncate", false)
.start()

sink1.awaitTermination()
sink2.awaitTermination()

This is my sample csv file content,

name,age,num,date
abc,28,123,2021-06-01T07:15:00
def,27,124,2021-06-01T08:16:00
abc,28,125,2021-06-01T07:15:00
ghi,28,126,2021-06-01T07:17:00