20
votes

I would like to do multiple aggregations in Spark Structured Streaming.

Something like this:

  • Read a stream of input files (from a folder)
  • Perform aggregation 1 (with some transformations)
  • Perform aggregation 2 (and more transformations)

When I run this in Structured Streaming, it gives me an error "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets".

Is there a way to do such multiple aggregations in Structured Streaming?

8
Have you tried using the lower level DStream abstraction?Yuval Itzchakov
I was hoping to use structured streaming (datasets / dataframes). Can you point me to some example where something similar is done with DStream?Kaptrain
any work around on this issue? please provide..same issue hereBigD

8 Answers

14
votes

This is not supported, but there are other ways also. Like performing single aggregation and saving it to kafka. Read it from kafka and apply aggregation again. This has worked for me.

4
votes

As in Spark 2.4.4 (latest for now) is NOT support the Multiple streaming aggregations you can use the .foreachBatch() method

A dummy example:

query =  spark
        .readStream
        .format('kafka')
        .option(..)
        .load()

       .writeStream
       .trigger(processingTime='x seconds')
       .outputMode('append')
       .foreachBatch(foreach_batch_function)
       .start()

query.awaitTermination()        


def foreach_batch_function(df, epoch_id):
     # Transformations (many aggregations)
     pass   
3
votes

Multiple aggregates in Spark Structured streaming is not supported as of Spark 2.4. Supporting this can be tricky esp. with event time in "update" mode since the aggregate output could change with late events. Its much straightforward to support this in "append" mode however spark does not support true watermarks yet.

Heres a proposal to add it in "append" mode - https://github.com/apache/spark/pull/23576

If interested you can watch the PR and post your votes there.

1
votes

TLDR - this is not supported; in some cases workarounds are possible.

Longer version -

  1. (a hack)

In some cases workarounds are possible, for example, if you'd like to have multiple count(distinct) in a streaming query on a low-cardinality columns, then it's easy for approx_count_distinct to actually return exact number of distinct elements by putting rsd argument low enough (that's the 2nd optional argument for approx_count_distinct, by default that's 0.05).

How is "low-cardinality" defined here? I don't recommend to rely on this approach for columns that can have more than 1000 unique values.

So in your streaming query you can do something like this -

(spark.readStream....
      .groupBy("site_id")
      .agg(approx_count_distinct("domain", 0.001).alias("distinct_domains")
         , approx_count_distinct("country", 0.001).alias("distinct_countries")
         , approx_count_distinct("language", 0.001).alias("distinct_languages")
      )
  )

Here's proof it actually works:

enter image description here

Notice that count(distinct) and count_approx_distinct give the same results! Here's some guidance on rsd argument count_approx_distinct:

  • for a column with 100 distinct values rsd of 0.02 was necessary;
  • for a column with 1000 distinct values rsd of 0.001 was necessary.

PS. Also notice that I had to comment out the experiment on a column with 10k distinct values as I didn't have enough patience for that to complete. That's why I mentioned you should not use this hack for columns with over 1k distinct values. For approx_count_distinct to match exact count(distinct) on over 1k distinct values would require rsd way too low for what HyperLogLogPlusPlus algorithm was designed for (this algorithm is behind approx_count_distinct implementation).

  1. (nice but more involving way)

As somebody else mentioned, you can use Spark's arbitrary stateful streaming to implement your own aggregates; and as many of aggregations as necessary on a single stream using [flat]MapWithGroupState. And this would a legit and supported way to do it unlike the above hack that only works in some cases. This method is only available for Spark Scala API and not available for PySpark.

  1. (perhaps this will be a long-term solution one day)

A proper way be to show some support for native multiple aggregation in Spark Streaming - https://github.com/apache/spark/pull/23576 -- vote up on this SPARK jira/ PR and show your support if you're interested in this.

0
votes

This is not supported in Spark 2.0 since the Structured Streaming API is still experimental. Refer here to see a list of all current limitations.

0
votes

For spark 2.2 and above (not sure about earlier version), if you can design the aggregation to use flatMapGroupWithState with append mode, you can do as many aggregations as you want. The restriction is mentioned here Spark structured streaming - Output mode

0
votes

You did not provide any code so I'm going with the example code referencing here.

Let's suppose that below is our initial code for DF to use.

import pyspark.sql.functions as F
spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")

Here group df by name and apply aggregation functions count, sum and balance.

grouped = csvDF.groupBy("name").agg(F.count("name"), F.sum("age"), F.avg("age"))
0
votes

As of spark structured streaming 2.4.5, multiple aggregations are not supported in stateless processing. But it is possible to aggregate multiple times if you need stateful processing.

With append mode, you can use flatMapGroupWithState API on a grouped dataset (obtained by using groupByKey API) multiple times.