Below is my Streaming Data Frame created from a weblog file:
val finalDf = joinedDf
.groupBy(window($"dateTime", "10 seconds"))
.agg(
max(col("datetime")).as("visitdate"),
count(col("ipaddress")).as("number_of_records"),
collect_list("ipaddress").as("ipaddress")
)
.select(col("window"),col("visitdate"),col("number_of_records"),explode(col("ipaddress")).as("ipaddress"))
.join(joinedDf,Seq("ipaddress"))
.select(
col("window"),
col("category").as("category_page_category"),
col("category"),
col("calculation1"),
hour(col("dateTime")).as("hour_label").cast("String"),
col("dateTime").as("date_label").cast("String"),
minute(col("dateTime")).as("minute_label").cast("String"),
col("demography"),
col("fullname").as("full_name"),
col("ipaddress"),
col("number_of_records"),
col("endpoint").as("pageurl"),
col("pageurl").as("page_url"),
col("username"),
col("visitdate"),
col("productname").as("product_name")
).dropDuplicates().toDF()
There are no aggregations performed on this Data Frame earlier at this stage. I have applied aggregation only once but still I am getting below error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;