1
votes

I am working with ASA lately and I am trying to insert ASA stream directly to the SQL table using reference data. I based my development on this MS article: https://msdn.microsoft.com/en-us/azure/stream-analytics/reference/reference-data-join-azure-stream-analytics.

Overview of data flow - telemetry:

  • I've many devices of different types (Heat Pumps, Batteries, Water Pumps, AirCon...). Each of these devices has different JSON schema for their telemetry data. I can distinguish JSONs by an attribute in message (e.g.: "DeviceType":"HeatPump" or "DeviceType":"AirCon"...)
  • All of these devices are sending their telemetry to a single Event Hub
  • Behind Event Hub, there is a single Stream Analytics component where I redirect streams to different outputs based on attribute Device Type. For example I redirect telemetry from HeatPumps with query SELECT * INTO output-sql-table FROM input-event-hub WHERE DeviceType = 'HeatPump'

I would like to use some reference data to "enrich" ASA stream with some IDKeys, before I inserted stream into SQL table.

What I've already done:

  • Successfully inserted ASA stream directly to SQL table using ASA query SELECT * INTO [sql-table] FROM Input WHERE DeviceType ='HeatPump', where [sql-table] has the same schema than JSON message + standard columns (EventProcessedUtcTime, PartitionID, EventEnqueueUtcTime)

  • Successfully inserted ASA stream directly to SQL table using ASA query SELECT Column1, Column2, Column3... INTO [sql-table] FROM Input WHERE DeviceType = 'HeatPump' - basically the same query as above, only this time I used named columns in select statement.

  • Generated JSON file of reference data and put it to the BLOB storage
  • Created new static (not using {date} and {time} placeholders) reference data input in ASA pointing to the file in BLOB storage.
  • Then I joined reference data to the data stream in ASA query using the same statement with named columns
  • Results no output rows in SQL table

When debugging the problem I've used Test functionality in Query ASA

  • I sample data from Event Hub - stream data.

  • I upload sample data from file - reference data.

  • After sampling data from Event Hub have finished, I tested a query -> output produced some rows -> it's not a problem in a query

  • Yet... if I run ASA, no output rows are inserted into SQL table.

Some other ideas I tried:

  • Used TRY_CAST function to cast fields from reference data to appropriate data types before I joined them with fields in stream data

  • Used TRY_CAST function to cast fields in SELECT before I inserted them into SQL table

I really don't know what to do now. Any suggestions?


EDIT: added data stream JSON, reference data JSON, ASA query, ASA input configuration, BLOB storage configuration and ASA test output result

Data Stream JSON - single message

[
 {
    "Activation": 0,
    "AvailablePowerNegative": 6.0,
    "AvailablePowerPositive": 1.91,
    "DeviceID": 99999,
    "DeviceIsAvailable": true,
    "DeviceOn": true,
    "Entity": "HeatPumpTelemetry",
    "HeatPumpMode": 3,
    "Power": 1.91,
    "PowerCompressor": 1.91,
    "PowerElHeater": 0.0,
    "Source": "<omitted>",
    "StatusToPowerOff": 1,
    "StatusToPowerOn": 9,
    "Timestamp": "2018-08-29T13:34:26.0Z",
    "TimestampDevice": "2018-08-29T13:34:09.0Z"
 }
]

Reference data JSON - single message

[
 {
    "SourceID": 1,
    "Source": "<ommited>",
    "DeviceID": 10,
    "DeviceSourceCode": 99999,
    "DeviceName": "NULL",
    "DeviceType": "Heat Pump",
    "DeviceTypeID": 1
 }
]

ASA Query

WITH HeatPumpTelemetry AS
(
    SELECT 
        *
    FROM 
        [input-eh]
    WHERE 
        source='<omitted>'
        AND entity = 'HeatPumpTelemetry'
)
SELECT 
    e.Activation,
    e.AvailablePowerNegative,
    e.AvailablePowerPositive,
    e.DeviceID,
    e.DeviceIsAvailable,
    e.DeviceOn,
    e.Entity,
    e.HeatPumpMode,
    e.Power,
    e.PowerCompressor,
    e.PowerElHeater,
    e.Source,
    e.StatusToPowerOff,
    e.StatusToPowerOn,
    e.Timestamp,
    e.TimestampDevice,
    e.EventProcessedUtcTime,
    e.PartitionId,
    e.EventEnqueuedUtcTime
INTO
    [out-SQL-HeatPumpTelemetry]
FROM
    HeatPumpTelemetry e
    LEFT JOIN [input-json-devices] d ON
        TRY_CAST(d.DeviceSourceCode as BIGINT) = TRY_CAST(e.DeviceID AS BIGINT)

ASA Reference Data Input configuration Reference Data input configuration in Stream Analytics

BLOB storage directory tree Blob storage directory tree

ASA test query output ASA test query output

2
Did you have multiple "SELECT ... FROM .. WHERE ..." and reference data joins? Can you please try with just one and see if it makes any difference? Does the logs say reference data was loaded?Vignesh Chandramohan
Hi,Then I joined reference data to the data stream in ASA query using the same statement with named columns please provide your query statement in this step.Jay Gong
Vignesh Chandramohan: where should I look for logs which say reference data was loaded?matejp
@matejp Hi,any updates now?Jay Gong
@JayGong: still no luck. I don't know where to look for logs. I've updated my question with required clarification (ASA query and configuration...).matejp

2 Answers

0
votes

matejp. I didn't reproduce your issue and you could refer to my steps.

reference data in blob storage:

{
    "a":"aaa",
    "reference":"www.bing.com"
}

stream data in blob storage

[
    {
        "id":"1",
        "name":"DeIdentified 1",
        "DeviceType":"aaa"      
    },
    {
        "id":"2",
        "name":"DeIdentified 2",
        "DeviceType":"No"     
    }
]

query statement:

SELECT
    inputSteam.*,inputRefer.*
into sqloutput
FROM
    inputSteam
Join inputRefer on inputSteam.DeviceType = inputRefer.a

Output:

enter image description here

Hope it helps you.Any concern, let me know.

0
votes

I think I found the error. In past days I tested nearly every combination possible when configuring inputs in Azure Stream Analytics.

I've started with this example as baseline: https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-build-an-iot-solution-using-stream-analytics

  • I've tried the solution without any changes to be sure that the example with reference data input works -> it worked
  • Then I've changed ASA output from CosmosDB to SQL table without changing anything -> it worked
  • Then I've changed my initial ASA job to be the as much the "same" as the ASA job in the example (writing into SQL table) -> it worked
  • Then I've started playing with BLOB directory names -> here I've found the error.

I think the problem I encountered is due to using a character "-" in folder name.

In my case I've created folder named "reference-data" and upload file named "devices.json" (folder structure "/reference-data/devices.json") -> ASA output to SQL table didn't work As soon as I've changed the folder name to "refdata" (folder structure "/referencedata/devices.json") -> ASA output to SQL table worked.

Tried 3 times changing reference data input from folder name containing "-" and not containing it => every time ASA output to SQL server stop working when "-" was in folder name.

To recap:

  • I recommend not to use "-" in BLOB folder names for static reference data input in ASA Jobs.