1
votes

I have created an Azure stream Analytics Job which will get input data from EventHub and write to cosmosDB and Blob.

I could see sometimes the data from eventHub is duplicating and as a result duplicate data will be written to cosmosDB and Blob storage.

A sample input data into Stream Analytics from EventHub is shown below.

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00026XXX03",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

In the above sample the event with idnum: 00086XXX02 is duplicating 3 times.

I am doing the below analysis and obtaining the output with duplicates.

temp AS (
    SELECT
        input.idnum AS IDNUM,
        input.basetime AS BASETIME,
        input.time AS TIME,
        ROUND(input.sig1,5) AS SIG1,
        flatArrayElement as SIG2,
        udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
    FROM [input01] as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 ),
SIGNALS AS (
  SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3 
)

--Insert SIG2 to COSMOS Container
SELECT 
    t.IDNUM,
    t.BASETIME,
    t.TIME,
    t.SIG1,
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3,
    t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId

The output will be as below where duplicate events are present for "idnum":"00086XXX02"

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"000000",
               "id":61
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},                           
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

The Expected output will be events without duplicates (For the sample provided there shouldn't be duplicate events for "idnum":"00086XXX02")

Before writing the data into storage I want to remove the duplicate events. Is it possible to do from Stream Analytics?

Creating cosmos DB collection with Unique ID is a solution from Cosmos end, But here the table alreay exists and can we do anything from Stream Analytics end ?

2

2 Answers

0
votes

I simplify your test sql as below:

with t AS (
    SELECT
        flatArrayElement as SIG2
    FROM fromblob as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 )
SELECT 
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId

And it produces duplicate data because of GetArrayElements() method which is normal i think.The array is split, the result should be duplicated definitely.

Based on my experience and my findings(plus this feedback),there is no distinct method in ASA. The reason i think is that ASA processes real-time stream data,not static data something like SQL table.It can't judge whether the data is duplicate during per time unit.

Combined with the last cosmos db case(How to find Duplicate documents in Cosmos DB), i think the key point of solution is getting the root cause : why the event hub processes duplicate source data. Surely,you could set a cosmos db trigger to prevent the duplicate data from streaming into db.But i think it's not an effective way.

0
votes

You can use Distinct to remove duplicate events. There is online documentation available: https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#remove-duplicate-events-in-a-window

Example:

With Temp AS ( 
SELECT      
  COUNT(DISTINCT Time) AS CountTime,    
  Value,    
  DeviceId  
FROM   Input TIMESTAMP BY Time  
GROUP BY   Value,  DeviceId,   SYSTEM.TIMESTAMP() 
)  
SELECT  
  AVG(Value) AS AverageValue,  
  DeviceId  
INTO Output  
FROM Temp  
GROUP BY DeviceId,TumblingWindow(minute, 5)