I have two dataframes one is streamed using spark structured streaming and a static one that I have created. And i am trying to join them.
But in every way that i have tried i am getting this error:
"Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets"
raw_s = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "...")\
.option("subscribe", "06-02-2020, 07-02-2020, 08-02-2020, 09-02-2020, 10-02-2020, 11-02-2020, 12-02-2020, 13-02-2020, 14-02-2020, 15-02-2020, 16-02-2020, 17-02-2020, 18-02-2020, 19-02-2020, 20-02-2020, 21-02-2020, 22-02-2020, 23-02-2020, 24-02-2020, 25-02-2020, 26-02-2020, 27-02-2020, 28-02-2020, 29-02-2020, 01-03-2020, 02-03-2020, 03-03-2020, 04-03-2020, 05-03-2020, 06-03-2020, 07-03-2020, 08-03-2020, 09-03-2020, 10-03-2020, 11-03-2020, 12-03-2020, 13-03-2020, 14-03-2020, 15-03-2020, 16-03-2020, 17-03-2020, 18-03-2020, 19-03-2020, 20-03-2020, 21-03-2020, 22-03-2020, 23-03-2020, 24-03-2020, 25-03-2020, 26-03-2020, 27-03-2020, 28-03-2020, 29-03-2020, 30-03-2020, 31-03-2020, 01-04-2020, 02-04-2020, 03-04-2020, 04-04-2020, 05-04-2020, 06-04-2020, 07-04-2020, 08-04-2020, 09-04-2020, 10-04-2020, 11-04-2020, 12-04-2020, 13-04-2020, 14-04-2020, 15-04-2020, 16-04-2020, 17-04-2020, 18-04-2020, 19-04-2020, 20-04-2020, 21-04-2020, 22-04-2020, 23-04-2020, 24-04-2020, 25-04-2020, 26-04-2020, 27-04-2020, 28-04-2020, 29-04-2020, 30-04-2020, 01-05-2020, 02-05-2020, 03-05-2020, 04-05-2020, 05-05-2020, 06-05-2020, 07-05-2020, 08-05-2020, 09-05-2020, 10-05-2020, 11-05-2020, 12-05-2020, 13-05-2020, 14-05-2020, 15-05-2020, 16-05-2020, 17-05-2020, 18-05-2020, 19-05-2020, 20-05-2020, 21-05-2020, 22-05-2020, 23-05-2020, 24-05-2020, 25-05-2020, 26-05-2020, 27-05-2020, 28-05-2020, 29-05-2020, 30-05-2020, 31-05-2020, 01-06-2020, 02-06-2020, 03-06-2020, 04-06-2020, 05-06-2020, 06-06-2020, 07-06-2020, 08-06-2020, 09-06-2020")\
.option("startingOffsets", "earliest")\
.load()
string_val = raw_s.selectExpr("CAST(value AS STRING)")
the static data frame is:
extra:pyspark.sql.dataframe.DataFrame
state:string
date_only:string
new_cases:double
deaths:double
the streamed one:
tweet_id:long
user_id:long
date:string
keywords:array
element:string
location:map
key:string
value:string
date_tmp:timestamp
date_only:string
country:string
state:string
city:string
and this is my code:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime
# Define the schema of the data:
schema = StructType()\
.add("tweet_id", LongType(), False)\
.add("user_id", LongType(), False)\
.add("date", StringType(), True)\
.add("keywords", ArrayType(StringType(), True), True)\
.add("location", MapType(StringType(), StringType(), True), True)
# Read each 'value' String as JSON:
json_df = string_val.select(F.from_json(F.col("value"), schema= schema).alias('json'))
# Flatten the nested object:
streaming_df = json_df.select("json.*")
streaming_df = streaming_df.withColumn("date_tmp",F.to_timestamp(F.col('date'), "EEE MMM dd HH:mm:ss ZZZZ yyyy"))
streaming_df = streaming_df.withColumn("date_only", F.from_unixtime(F.unix_timestamp(streaming_df.date_tmp), "MM-dd-yyyy"))
streaming_df = streaming_df.withColumn("country", streaming_df.location.country)
streaming_df = streaming_df.withColumn("state", streaming_df.location.state)
streaming_df = streaming_df.withColumn("city", streaming_df.location.city)
streaming_df = streaming_df.where(streaming_df.country == 'United States')
streaming_df = streaming_df.where(streaming_df.state.isNotNull())
join_df = streaming_df.join(extra, ['date_only','state'],'inner')
display(join_df)
output:
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;