0
votes

I used Spark Streaming to process online requirement such as the new user counts per hour like this:

by each batch, when log comes, then select the uid from the external table such as hbase or dynamodb, if not exist, then insert the table

this approach used the table so frequently that cost too much expense.

Now I want to use structured streaming to solve this problem.

in the following sql can solve the problem offline:

sql1

create table event_min_table as select pageid,uid,floor(min(time)/36000)*3600 as event_time from event_table group by pageid,uid

sql2

select pageid,count(distinct uid) as cnt from event_min_table group by pageid,event_time

As I am not familiar with the structured streaming, structured streaming not support the multiple aggregation, so I used like this:

  1. readStream to create a query as sql1 then register as a table in memory and output mode is complete

  2. create a query from the table used sql2 and output format is update, save to the external table like hbase or dynamodb

I don't know whether my approach can solve the problem, but I have several questions:

  1. if I create a memory table in complete output mode, the data will bigger as the time goes on?

  2. even this may worked, but the result whether output when each log come, so the question still can't solved, my goal is to decrease the request to the external table,such as hbase or dynamodb

1
"but I have several questions" only one question at a time (per StackOverflow rules). I'm still unclear what you're asking for. - Jacek Laskowski

1 Answers

0
votes

1) if I create a memory table as complete output mode, the data will bigger as the time goes on?

I don't think so (see the code).

my goal is to decrease the request to the external table

You can get full control on what, when and how long remains in state store using KeyValueGroupedDataset.flatMapGroupsWithState operator:

flatMapGroupsWithState Applies the given function to each group of data, while maintaining a user-defined per-group state. The result Dataset will represent the objects returned by the function. For a static batch Dataset, the function will be invoked once per group. For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger, and updates to each group's state will be saved across invocations.

That's the most control you can get in Structured Streaming over the past and current datasets.