2
votes

Let me preface that I'm a noob to Logic Apps and Data Factory. Anyways, I'm currently working on an integration and one part of it is that I need to trigger a pipeline in Data Factory from Logic Apps. I've successfully done that, the one part I can't seem to figure out is how to pass parameters to my pipeline. I've tried altering the JSON under both the "parameters" & "triggers" sections but haven't gotten anything to click so far. The pipeline ends up executing, but only with the default parameters.

Has anybody had any success in doing this yet? Any help is appreciated.

3
Are you using the REST API to trigger the Data Factory Pipeline Run ? docs.microsoft.com/en-us/azure/data-factory/…. It may be useful in your case. The logic app connector does not support parameters ?Thomas
I may need to play around with that, perhaps using the HTTP action in Logic Apps? Mostly trying to get an idea if I'm actually able to use parameters within the ADF connector, which it sounds like I won't be able to?Mike R
Yeah the Http action should workThomas
Hi Mike, i manage to do this with a combination of azure function and logic app. So I wrote a function that runs pipeline and with .net SDK you can pass parameters.DraganB
@wBob you can check my answer and see if it fits you :)DraganB

3 Answers

2
votes

You can use the body property of the logic app's "Create a pipeline run" action to pass parameters to the pipeline. As always, be careful because not only is this action in preview but I also couldn't find this solution in any MS documentation. I just made an educated guess based on how other similar actions are formatted.

Example:

"Run_my_pipeline": {
  "inputs": {
    "host": {
      "connection": {
        "name": "@parameters('$connections')['azuredatafactory']['connectionId']"
      }
    },
    "method": "post",
    "body": {
      "param1": "myParamValue",
      "param2": "myParamValue"
    },
    "path": "...",
    "queries": {
      "x-ms-api-version": "2017-09-01-preview"
    },
    "authentication": "@parameters('$authentication')"
  }
}
1
votes

as I said in the comment I create a workaround with azure functions. Azure functions and logic app work well together. On this link you can see how to create and manage pipelines with .net https://docs.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-dot-net

If you already have ADF and pipeline, you just want to run it (with pipelines) then you can just

Dictionary<string, object> parameters = new Dictionary<string, object>
       {
           {"BoxSerialNumbers", req.BoxSerialNumbers},
           {"StartDate", req.StartDate },
           {"EndDate",req.EndDate },
           {"Recipient", req.Recipient }
       };//this is how you add initialaze parameters

        var client = Authenticate(); //Authentication with azure
        log.Info("Creating.");
        CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);//run pipeline, you can do this async (it's better)
        log.Info("Created.");
        var response = new HttpResponseMessage();
        if (client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId).Status.Equals("InProgress"))
        {
            response = new HttpResponseMessage(HttpStatusCode.OK)
            {
                Content = new StringContent(runResponse.RunId, Encoding.UTF8)
            };
        }
        else
        {
            response = new HttpResponseMessage(HttpStatusCode.BadRequest)
            {
                Content = new StringContent("Pipeline didn't started", Encoding.UTF8)//just some validation for function
            };
        }
        return response;                                               


    public static DataFactoryManagementClient Authenticate()
    {
        var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
        ClientCredential cc = new ClientCredential(applicationID, authenticationKey);
        AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
        ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
        return new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionID };
    }

So in the request, you can pass your parameters from a logic app, with runId you can check status. Then in logic app just simple HTTP request to call this function. Hope this help someone.

1
votes

I used DraganB's solution but the call signature on

CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);

has changed. Minor edits make this work perfectly:

CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters: parameters);

Here's the function for anyone that needs it.

[FunctionName("DatafactoryShim")]
    public async static Task<HttpResponseMessage> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post")]
        HttpRequestMessage req,
        ExecutionContext context,
        TraceWriter log
    )
    {
        string messageBody = await req.Content.ReadAsStringAsync();

        BlobToDatalakeFactoryParameters postValues = JsonHelper.ToClass<BlobToDatalakeFactoryParameters>(messageBody);

        Dictionary<string, object> parameters = new Dictionary<string, object>
        {
            {"blobContainer", postValues.BlobContainer},
            {"blobFolder", postValues.BlobFolder },
            {"relativeDatalakeFolder", postValues.RelativeDatalakeFolder },
            {"modelType", postValues.ModelType }

        }; //this is how you add initialaze parameters

        var client = Authenticate(); //Authentication with azure

        string resourceGroup = ConfigurationManager.AppSettings["resourceGroup"];
        string dataFactoryName = ConfigurationManager.AppSettings["dataFactoryName"];
        string pipelineName = ConfigurationManager.AppSettings["pipelineName"];

        Console.WriteLine("Creating pipeline run...");
        CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
            resourceGroup,
            dataFactoryName,
            pipelineName,
            parameters: parameters).Result.Body;
        Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

        var response = new HttpResponseMessage();

        if (client.PipelineRuns.Get(ConfigurationManager.AppSettings["resourceGroup"],
            ConfigurationManager.AppSettings["dataFactoryName"], runResponse.RunId).Status.Equals("InProgress"))
        {
            response = new HttpResponseMessage(HttpStatusCode.OK)
            {
                Content = new StringContent(runResponse.RunId, Encoding.UTF8)
            };
        }
        else
        {
            response = new HttpResponseMessage(HttpStatusCode.BadRequest)
            {
                Content =
                    new StringContent("Pipeline didn't started", Encoding.UTF8) //just some validation for function
            };
        }

        return response;
    }