0
votes

What I am trying to do:

I want to "throttle" an input stream to its output. Specifically, as I receive multiple similar inputs, I only want to produce an output if one hasn't already been produced in the last N hours.

For example, the input could be thought of as "send an email", but I will get dozens/hundreds of those events. I only want to send an email if I haven't already sent one in the last N hours (or have never sent one).

See the final example here: https://docs.microsoft.com/en-us/stream-analytics-query/join-azure-stream-analytics#examples for something similar to what I am trying to do

What my setup looks like:

There are two inputs to my query:

  1. Ingress: this is the "raw" input stream
  2. Throttled-Sent: this is just a consumer group off of my output stream

My query is as follows:

WITH
AllEvents as (
    /* This CTE is here only because we can't seem to use GetMetadataPropertyValue in a join clause, so "materialize" it here for use- later */
    SELECT
        *,
        GetMetadataPropertyValue([Ingress], '[User].[Type]') AS Type,
        GetMetadataPropertyValue([Ingress], '[User].[NotifyType]') AS NotifyType,
        GetMetadataPropertyValue([Ingress], '[User].[NotifyEntityId]') AS NotifyEntityId
    FROM
        [Ingress]
),
UseableEvents as (
    SELECT  *
    FROM    AllEvents
    WHERE   NotifyEntityId IS NOT NULL
),
AlreadySentEvents as (
    /* These are the events that would have been previously output (referenced here via a consumer group). We want to capture these to make sure we are not sending new events when an older "already-sent" event can be found */
    SELECT
        *,
        GetMetadataPropertyValue([Throttled-Sent], '[User].[Type]') AS Type,
        GetMetadataPropertyValue([Throttled-Sent], '[User].[NotifyType]') AS NotifyType,
        GetMetadataPropertyValue([Throttled-Sent], '[User].[NotifyEntityId]') AS NotifyEntityId
    FROM
        [Throttled-Sent]
)

SELECT  i.*
INTO    Throttled
FROM    UseableEvents i
/* Left join our sent events, looking for those within a particular time frame */
LEFT OUTER JOIN AlreadySentEvents s
    ON i.Type = s.Type
    AND i.NotifyType = s.NotifyType
    AND i.NotifyEntityId = s.NotifyEntityId
    AND DATEDIFF(hour, i, s) BETWEEN 0 AND 4
WHERE   s.Type IS NULL /* The is null here is for only returning those Ingress rows that have no corresponding AlreadySentEvents row */

The results I'm seeing:

This query is producing no rows to the output. However, I believe it should be producing something because the Throttled-Sent input has zero rows to begin with. I have validated that my Ingress events are showing up (by simply adjusting the query to remove the left join and checking the results).

I feel like my problem is probably linked to one of the following areas:

  1. I can't have an input that is a consumer group off of the output (but I don't know why that wouldn't be allowed)
  2. My datediff usage/understanding is incorrect

Appreciate any help/guidance/direction!

1

1 Answers

0
votes

For throttling, I would recommend looking at IsFirst function, it might be easier solution that will not require reading from the output.

For the current query, I think order of DATEDIFF parameters need to be changed as s comes before i: DATEDIFF(hour, s, i) BETWEEN 0 AND 4