1
votes

Currently, I am using spark structured streaming to create data frames of random data in the form of (id, timestamp_value, device_id, temperature_value, comment).

Spark Dataframe per Batch: Spark Dataframe per Batch

Based on the screenshot of the data frame above, I would like to have some descriptive statistics for the column "temperature_value". For example, min, max, mean, count, variance.

My approach to achieve this in python is the following:

import sys
import json
import psycopg2
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import get_json_object
from pyspark.ml.stat import Summarizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import lit,unix_timestamp
from pyspark.sql import functions as F
import numpy as np
from pyspark.mllib.stat import Statistics

spark = SparkSession.builder.appName(<spark_application_name>).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.streams.active

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka_broker:<port_number>").option("subscribe", <topic_name>).option("startingOffsets", "latest").load()

schema = StructType([
    StructField("id", DoubleType()),
    StructField("timestamp_value", DoubleType()), 
    StructField("device_id", DoubleType()), 
    StructField("temperature_value", DoubleType()),
    StructField("comment", StringType())])

telemetry_dataframe = data.selectExpr("CAST(value AS STRING)").select(from_json(col("value").cast("string"), schema).alias("tmp")).select("tmp.*")

telemetry_dataframe.printSchema()

temperature_value_selection = telemetry_dataframe.select("temperature_value")

temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature", temperature_value_selection["temperature_value"].cast(DecimalType()))

temperature_value_selection_new.printSchema()

assembler = VectorAssembler(
  inputCols=["device_temperature"], outputCol="temperatures"
)

assembled = assembler.transform(temperature_value_selection_new)

assembled_new = assembled.withColumn("timestamp", F.current_timestamp())

assembled_new.printSchema()

# scaler = StandardScaler(inputCol="temperatures", outputCol="scaledTemperatures", withStd=True, withMean=False).fit(assembled)

# scaled = scaler.transform(assembled)

summarizer = Summarizer.metrics("max", "min", "variance", "mean", "count")

descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").select(summarizer.summary(assembled_new.temperatures))
#descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp")).orderBy('timestamp', ascending=False).select(summarizer.summary(assembled.temperatures))

#descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures))

# descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature))


# -------------------------------------------------------------------------------------

#########################################
#               QUERIES                 #
#########################################

query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime = "5 seconds").start()#.awaitTermination()

query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime = "8 seconds").start()#.awaitTermination()

query_3= assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime = "11 seconds").start()#.awaitTermination()

#query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime = "14 seconds").start()#.awaitTermination()
query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime = "17 seconds").start()#.awaitTermination()

Summarizer documentation.

Based on the posted code, I am isolating the column "temperature_value" and then I vectorize it (using VectorAssembler) to create the column "temperatures" of type vector.

What I would like is to output the result of the "Summarizer" function to my console. This is why I use "append" for outputMode and format "console". But I was getting this error: pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark. Thus, I used the "withWatermark" function but I am still getting the same error with the outputMode "append".

When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.

Instant streaming termination:

Instant streaming termination

My questions:

  1. How should I use the "withWatermark" function in order to output the summary statistics of the vector column "temperatures" to my console?

  2. Is there any other approach to calculate descriptive statistics for a custom column of my data frame, which I may miss?

I appreciate any help in advance.

EDIT (20.12.2019)

The solution has been given and accepted. Although, now I get the following error:

enter image description here

enter image description here

1
Do you want to generate the stats per batch or multiple batches? How long are you going to wait till necessary records are available (= how many batches to wait till stats are generated)? - Jacek Laskowski
@JacekLaskowski sorry for the late response. I want to generate statistics per batch. But having checked the watermark approach, I figured out that if I set a watermark of 10 minutes the descriptive statistics will auto-update for the last 10 mins. Thus, I will have statistics for all of my batches (in total I generate 8-10 batches). Both approaches, either statistics per batch, either statistics for all my batches (10 mins watermark) are accepted. The problem is that I don't know how to use the watermark approach in structured streaming even though I already read the relative documentation. - NikSp

1 Answers

1
votes

When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.

All your streaming queries are up and running, but (the main thread of) the pyspark application does not even give them a chance to run for long (since it does not await any termination due to #.awaitTermination()).

You should block the main thread of the pyspark application using StreamingQuery.awaitTermination(), e.g. query_1.awaitTermination()