0
votes

Spark offers some great streaming functionalities. Recently https://spark.rstudio.com/guides/streaming/ R gained streaming capabilities via sparklyR using structured streaming.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html supports many JOIN variants (watermarked within a certain window)

How can I use these windowing capabilities with sparklyR?

edit

I am interested in two cases:

Windowed aggregation

(scala)

val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

(R) stream_watermark(df, column="timestamp", threshold="10 minutes") replaces This .withWatermark("timestamp", "10 minutes") Where can I find window($"timestamp", "10 minutes", "5 minutes"),?

Streaming joins

How is https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking ported to sparklyR?

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)

trigger

How can I set triggers as defined in: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

stream_trigger_interval can specify fixed trigger intervals, but what about unspecified or run once or continous?

1

1 Answers

0
votes

for the windowed aggregation:

library(sparklyr)
library(dplyr)
library(future)
conf <- spark_config()
spark <- spark_connect(master = "local", config = conf)
`
source <- "streaming_source"
destination <- "streaming_destination"
if(file.exists(source)) unlink(source, TRUE)
if(file.exists(destination)) unlink(destination, TRUE)

lenght_df <- 1000
dates <- base::sample(seq(as.Date('2018-01-01'), as.Date('2019-06-01'), by="day"), replace = TRUE, lenght_df)
values <- rnorm(lenght_df)
event_category <- base::sample(c("regular", "error", "security_alert"), lenght_df, replace = TRUE)
sampling_df <- data.frame(values, dates, event_category)
sampling_df <- sampling_df %>%
  rename(timestamp = dates) %>%
  rename(category = event_category) %>%
  rename(value = values)
head(sampling_df)

stream_generate_test(df = sampling_df, iterations = 1, path = source)

read_folder <- stream_read_csv(spark, source) 
process_stream <- read_folder %>%
  stream_watermark(column="timestamp", threshold="5 days") %>% 
  group_by(time_window = window(timestamp,  "7 days", "3 days"), category) %>%
  summarise(
    mean = mean(value, na.rm = TRUE),
    count = n()
  ) %>%
  sdf_separate_column("time_window", into=c("beginning", "end")) %>%
  select(-time_window) %>%
  arrange(desc(count))

my_table <- "stream"
write_output <- stream_write_memory(process_stream, name = my_table)
##########################################
tbl(spark, my_table)  # execute repeatedly
tbl(spark, my_table)  # execute repeatedly
tbl(spark, my_table)  # execute repeatedly
##########################################

invisible(future(stream_generate_test(df = sampling_df, interval = 0.2, iterations = 100, path = source)))
stream_view(write_output)

streaming joins

  • static-stream works just fine
  • stream-stream:
    • INNER: works
    • OUTER: TODO figure out, fails with: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition

triggers

TODO figure out