0
votes

I have two dataframes one is streamed using spark structured streaming and a static one that I have created. And i am trying to join them.

But in every way that i have tried i am getting this error:

"Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets"

raw_s = spark.readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers", "...")\
          .option("subscribe", "06-02-2020, 07-02-2020, 08-02-2020, 09-02-2020, 10-02-2020, 11-02-2020, 12-02-2020, 13-02-2020, 14-02-2020, 15-02-2020, 16-02-2020, 17-02-2020, 18-02-2020, 19-02-2020, 20-02-2020, 21-02-2020, 22-02-2020, 23-02-2020, 24-02-2020, 25-02-2020, 26-02-2020, 27-02-2020, 28-02-2020, 29-02-2020, 01-03-2020, 02-03-2020, 03-03-2020, 04-03-2020, 05-03-2020, 06-03-2020, 07-03-2020, 08-03-2020, 09-03-2020, 10-03-2020, 11-03-2020, 12-03-2020, 13-03-2020, 14-03-2020, 15-03-2020, 16-03-2020, 17-03-2020, 18-03-2020, 19-03-2020, 20-03-2020, 21-03-2020, 22-03-2020, 23-03-2020, 24-03-2020, 25-03-2020, 26-03-2020, 27-03-2020, 28-03-2020, 29-03-2020, 30-03-2020, 31-03-2020, 01-04-2020, 02-04-2020, 03-04-2020, 04-04-2020, 05-04-2020, 06-04-2020, 07-04-2020, 08-04-2020, 09-04-2020, 10-04-2020, 11-04-2020, 12-04-2020, 13-04-2020, 14-04-2020, 15-04-2020, 16-04-2020, 17-04-2020, 18-04-2020, 19-04-2020, 20-04-2020, 21-04-2020, 22-04-2020, 23-04-2020, 24-04-2020, 25-04-2020, 26-04-2020, 27-04-2020, 28-04-2020, 29-04-2020, 30-04-2020, 01-05-2020, 02-05-2020, 03-05-2020, 04-05-2020, 05-05-2020, 06-05-2020, 07-05-2020, 08-05-2020, 09-05-2020, 10-05-2020, 11-05-2020, 12-05-2020, 13-05-2020, 14-05-2020, 15-05-2020, 16-05-2020, 17-05-2020, 18-05-2020, 19-05-2020, 20-05-2020, 21-05-2020, 22-05-2020, 23-05-2020, 24-05-2020, 25-05-2020, 26-05-2020, 27-05-2020, 28-05-2020, 29-05-2020, 30-05-2020, 31-05-2020, 01-06-2020, 02-06-2020, 03-06-2020, 04-06-2020, 05-06-2020, 06-06-2020, 07-06-2020, 08-06-2020, 09-06-2020")\
          .option("startingOffsets", "earliest")\
          .load()

string_val = raw_s.selectExpr("CAST(value AS STRING)")
the static data frame is:
extra:pyspark.sql.dataframe.DataFrame
state:string
date_only:string
new_cases:double
deaths:double

the streamed one:
tweet_id:long
user_id:long
date:string
keywords:array
  element:string
location:map
  key:string
  value:string
date_tmp:timestamp
date_only:string
country:string
state:string
city:string

and this is my code:

import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime

# Define the schema of the data:
schema = StructType()\
          .add("tweet_id", LongType(), False)\
          .add("user_id", LongType(), False)\
          .add("date", StringType(), True)\
          .add("keywords", ArrayType(StringType(), True), True)\
          .add("location", MapType(StringType(), StringType(), True), True)

# Read each 'value' String as JSON:
json_df = string_val.select(F.from_json(F.col("value"), schema= schema).alias('json'))
# Flatten the nested object:
streaming_df = json_df.select("json.*")
streaming_df = streaming_df.withColumn("date_tmp",F.to_timestamp(F.col('date'), "EEE MMM dd HH:mm:ss ZZZZ yyyy"))
streaming_df = streaming_df.withColumn("date_only", F.from_unixtime(F.unix_timestamp(streaming_df.date_tmp), "MM-dd-yyyy"))
streaming_df = streaming_df.withColumn("country", streaming_df.location.country)
streaming_df = streaming_df.withColumn("state", streaming_df.location.state)
streaming_df = streaming_df.withColumn("city", streaming_df.location.city)
streaming_df = streaming_df.where(streaming_df.country == 'United States')
streaming_df = streaming_df.where(streaming_df.state.isNotNull())
join_df = streaming_df.join(extra, ['date_only','state'],'inner')
display(join_df)

output:
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
1
structured streaming? - thebluephantom
yes spark structured streaming @thebluephantom - shreder1921
thought so, are you showing all code? - thebluephantom
your mode is??? complete... - thebluephantom
now it is the full code @thebluephantom - shreder1921

1 Answers

1
votes

From the manual https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html:

Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries (ONLY).

Try append mode or add an aggregation.

But as you have a JOIN you will need append mode. For JOINs:

Update and Complete mode not supported yet for JOIN. Spark 2.4.5.

Complete mode does not drop old aggregation state. Would result in OOM errors.

You need to try something like this:

  join_df.writeStream \
    .format("console")  \
    .outputMode("append") \
    .start() \
    .awaitTermination()

as display, if memory serves correctly has complete as mode.