0
votes

I have simulated devices which is sending messages to IoT Hub blob storage and from there I am copying data(encoded in JSON format) to Azure Data Lake Gen2 by creating a pipeline using Azure Data Factory.

How to convert these json output file to CSV file using Azure function and store it again on Azure Data Lake Store.


@Adam,

Thank You so much for your all the answer and I implemented those successfully in my azure account. But, this does not actually given me the desired output which I was looking for.

Hope this make things clear and my requirement.

My Input file which sent to IOT Hub is:-

enter image description here

Below is the sample records of the data which is stored in IOT Hub endpoints(Blob Storage):- (Json - Set of Objects):-

{"EnqueuedTimeUtc":"2019-08-06T10:46:39.4390000Z","Properties":{"$.cdid":"Simulated-File"},"SystemProperties":{"messageId":"d48413d2-d4d7-41bb-9470-dc0483466253","correlationId":"a3062fcb-5513-4c09-882e-8e642f8fe38e","connectionDeviceId":"Simulated-File","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"637001643970703748","contentType":"UTF-8","enqueuedTime":"2019-08-06T10:46:39.4390000Z"},"Body":"eyIiOiI1OCIsInJvdGF0ZSI6IjQ2Mi4wMjQxODE3IiwiZGF0ZXRpbWUiOiIxLzMvMjAxNSAxNjowMCIsIm1hY2hpbmVJRCI6IjEiLCJ2b2x0IjoiMTU2Ljk1MzI0NTkiLCJwcmVzc3VyZSI6IjEwNi4zNDY3MTc5IiwidmlicmF0aW9uIjoiNDguODIwMzAwODYifQ=="}

{"EnqueuedTimeUtc":"2019-08-06T10:46:40.5040000Z","Properties":{"$.cdid":"Simulated-File"},"SystemProperties":{"messageId":"9da638d9-fdba-41d3-86df-3ea6cedc44e7","correlationId":"aeb20305-6fee-4a59-9053-5fa1d0c780a9","connectionDeviceId":"Simulated-File","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"637001643970703748","contentType":"UTF-8","enqueuedTime":"2019-08-06T10:46:40.5040000Z"},"Body":"eyIiOiI1OSIsInJvdGF0ZSI6IjQyOS44MjIxNDM1IiwiZGF0ZXRpbWUiOiIxLzMvMjAxNSAxNzowMCIsIm1hY2hpbmVJRCI6IjEiLCJ2b2x0IjoiMTY0LjE0ODA5NDYiLCJwcmVzc3VyZSI6IjEwNC41MzIxMjM2IiwidmlicmF0aW9uIjoiNDMuNzg4NjgxNTUifQ=="}

**The Json "Body" field contains the actual IOT device data, which is encoded in JSON format with some system and message properties.

**By creating pipeline to JSON to CSV, does not extract the actual data to Data Lake Store.

Output CSV is exact copy of the JSON file(real data is not extracted)after running the ADF pipeline.

[![enter image description here][2]][2]

1: https://i.stack.imgur.com/iCBvi.png

[2]: https://i.stack.imgur.com/HaBMn.png

I have tried with below C# code and successfully parse the JSON, but cannot read the data from Json "body"field. Assist me on this.

#r "Microsoft.WindowsAzure.Storage"
#r "System.Linq"
#r "Newtonsoft.Json"
#r "Microsoft.Azure.WebJobs.Extensions.Storage"
#r "Microsoft.Azure.WebJobs"
#r "Microsoft.Azure.WebJobs.Extensions"
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Extensions;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Security.Authentication;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public class Telemetry
{
    public string num { get; set; }
    public string datetime { get; set; }
    public string machineID { get; set; }
    public string volt { get; set; }
    public string rotate { get; set; }
    public string pressure { get; set; }
    public string vibration { get; set; }
}
public static void Run(
    [BlobTrigger("blobcontainer/{name}")] String myBlob, 
    //[Blob("bloboutput/{name}"), FileAccess.Write] Stream outputBlob,
    string name, ILogger log)
{
    log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {myBlob.Length} Bytes");
var serializer = new JsonSerializer();
using (var sr = new StreamReader(myBlob))
    using (var jsonTextReader = new JsonTextReader(sr))
    {
        var Results = (Telemetry)serializer.Deserialize(jsonTextReader, typeof(Telemetry));
        // Do something with Results.
    }
}
1
you need to use an integration account, put a schema and a map in it for your json to csv transform, and then use a logic app to actually execute that transformationAlex Gordon

1 Answers

1
votes

I would really not save them into blob but just leave them in IoT hub and make functions trigger of that. But if you really it need from blob then.

Make function trigger like this

[FunctionName("BlobTriggerCSharp")]        
public static void Run(
    [BlobTrigger("input/{name}")] Stream myBlob, 
    [Blob("output/{name}"), FileAccess.Write] Stream outputBlob
    string name, ILogger log)
{
    // parse JSON with JsonConvert 
    // parse using base64 decode from SystemProperties.contentType field
    // write parsed output to outputBlob stream
}

Then in ADF

Placed this in demo container as file called demo.json on blob storage.

I created source dataset of type JSON in ADF

enter image description here

which looks like this

enter image description here

You can test if it works by pressing preview data.

enter image description here

And a sink dataset as follows

enter image description here

And then I simply made copy activity.

enter image description here

enter image description here

enter image description here

And after running it I got JSON transformed to CSV

You can see it in blob

enter image description here

enter image description here

If you want to learn more then check my video for

  1. Data Factory introduction https://youtu.be/EpDkxTHAhOs and,
  2. Function App introduction https://youtu.be/Vxf-rOEO1q4