0
votes

I have created the Stream Analytics job to read data from IOT hub as input and and to store data to SQL DB.

Here are the some important input details configured for Steam Analytics job are Event Serialization format: JSON Encoding :utf-8

The message is sent to IOT Hub from Dotnet simulated code.

When I am running my job I am getting the following error:

 Could not deserialize the input event as Json. Some possible reasons: 
 1) Malformed events 
 2) Input source configured with incorrect serialization format

And here is the my dotnet code.

       private static async void ReceiveC2dAsync()
    {
        Console.WriteLine("\nReceiving cloud to device messages from service");
        while (true)
        {
            Message receivedMessage = await deviceClient.ReceiveAsync();
            if (receivedMessage == null) continue;

            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine("Received message: {0}", Encoding.ASCII.GetString(receivedMessage.GetBytes()));
            Console.ResetColor();

            await deviceClient.CompleteAsync(receivedMessage);
        }
    }
    private static async void SendDeviceToCloudMessagesAsync()
    {
        double minTemperature = 20;
        double minHumidity = 60;
        int messageId = 1;
        Random rand = new Random();

        while (true)
        {
            double currentTemperature = minTemperature + rand.NextDouble() * 15;
            double currentHumidity = minHumidity + rand.NextDouble() * 20;

            var telemetryDataPoint = new
            {
                messageId = messageId++,
                deviceId = "myFirstDevice",
                temperature = currentTemperature,
                humidity = currentHumidity
            };
            var messageString = JsonConvert.SerializeObject(telemetryDataPoint);
            string levelValue;
            string temperatureAlert = "false";


            if (rand.NextDouble() > 0.7)
            {
                if (rand.NextDouble() > 0.5)
                {
                    messageString = "This is a critical message";
                    levelValue = "critical";
                }
                else
                {
                    messageString = "This is a storage message";
                    levelValue = "storage";
                }
            }
            else
            {
                levelValue = "normal";
            }

            if(currentTemperature > 30)
            {
                temperatureAlert = "true";
            }

            var message = new Message(Encoding.UTF8.GetBytes(messageString));
            message.Properties.Add("level", levelValue);
            message.Properties.Add("temperatureAlert", temperatureAlert);

            await deviceClient.SendEventAsync(message);
            Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, messageString);

            await Task.Delay(1000);
        }
    }
1
it looks like your simulated device generated non-json formatted messages such as "This is a critical message" and "This is a storage message". Basically, you have to choices: 1. comment this part in the simulated code or 2. add the filter in the Azure IoT Hub Routes for these messages.Roman Kiss
@raghu456, you can try to save the json data to a file(name it *.json), and then upload the file to test.Please refer to Azure Stream Analytics query testing and input-stream sampling.If the data is in correct json format,it will be queried successfully.Michael Xu - MSFT
Thanks Roman for pointing the error. this resolved my issue.raghu456

1 Answers

1
votes

it looks like your simulated device generated non-json formatted messages such as "This is a critical message" and "This is a storage message".

Basically, you have two choices to fix this issue: 1. comment this part in the simulated code or 2. add the filter in the Azure IoT Hub Routes for these messages