0
votes

Below is my Streaming Data Frame created from a weblog file:

 val finalDf = joinedDf
    .groupBy(window($"dateTime", "10 seconds"))
    .agg(
      max(col("datetime")).as("visitdate"),
      count(col("ipaddress")).as("number_of_records"),
      collect_list("ipaddress").as("ipaddress")
    )
    .select(col("window"),col("visitdate"),col("number_of_records"),explode(col("ipaddress")).as("ipaddress"))
    .join(joinedDf,Seq("ipaddress"))
    .select(
      col("window"),
      col("category").as("category_page_category"),
      col("category"),
      col("calculation1"),
      hour(col("dateTime")).as("hour_label").cast("String"),
      col("dateTime").as("date_label").cast("String"),
      minute(col("dateTime")).as("minute_label").cast("String"),
      col("demography"),
      col("fullname").as("full_name"),
      col("ipaddress"),
      col("number_of_records"),
      col("endpoint").as("pageurl"),
      col("pageurl").as("page_url"),
      col("username"),
      col("visitdate"),
      col("productname").as("product_name")
    ).dropDuplicates().toDF()

There are no aggregations performed on this Data Frame earlier at this stage. I have applied aggregation only once but still I am getting below error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;

1

1 Answers

1
votes

There are indeed two aggregations here. The first one is explicit:

.groupBy(...).agg(...)

the second one is required for

.dropDuplicates()

which is implemented

.groupBy(...).agg(first(...), ...)

You'll have to redesign your pipeline.