I try to migrate a pipeline that already exists in ADFV1 to ADFV2 and have some issues with the concept of triggers. My pipeline has two activiries, the first one is an Azure Data Lake Analytics activity and the second a copy activity. The first activity runs a usql script where data is read from partioned folder /{yyyy}/{MM}/{dd}/, process it and write in folder /{yyyy}-{MM}-{dd}/. Here are some JSON files from my factory (pipeline, trigger and datasets).
Pipeline:
{
"name": "StreamCompressionBlob2SQL",
"properties": {
"activities": [
{
"name": "compress",
"type": "DataLakeAnalyticsU-SQL",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"scriptPath": "d00044653/azure-configurations/usql-scripts/stream/compression.usql",
"scriptLinkedService": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')",
"type": "Expression"
},
"Month": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'MM')",
"type": "Expression"
},
"Day": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'dd')",
"type": "Expression"
}
}
},
"linkedServiceName": {
"referenceName": "AzureDataLakeAnalytics1",
"type": "LinkedServiceReference"
}
},
{
"name": "Blob2SQL",
"type": "Copy",
"dependsOn": [
{
"activity": "compress",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000
},
"enableStaging": false,
"dataIntegrationUnits": 0,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"tag": "TAG",
"device_id": "DEVICE_ID",
"system_id": "SYSTEM_ID",
"utc": "UTC",
"ts": "TS",
"median": "MEDIAN",
"min": "MIN",
"max": "MAX",
"avg": "AVG",
"stdev": "STDEV",
"first_value": "FIRST_VALUE",
"last_value": "LAST_VALUE",
"message_count": "MESSAGE_COUNT"
}
}
},
"inputs": [
{
"referenceName": "AzureBlobDataset_COMPRESSED_ASA_v1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"type": "DatasetReference"
}
]
}
],
"parameters": {
"windowStartTime": {
"type": "String"
}
}
}
}
Trigger:
{
"name": "trigger1",
"properties": {
"runtimeState": "Started",
"pipelines": [
{
"pipelineReference": {
"referenceName": "StreamCompressionBlob2SQL",
"type": "PipelineReference"
},
"parameters": {
"windowStartTime": "@trigger().scheduledTime"
}
}
],
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Day",
"interval": 1,
"startTime": "2018-08-17T10:46:00.000Z",
"endTime": "2018-11-04T10:46:00.000Z",
"timeZone": "UTC"
}
}
}
}
Input Dataset for Copy Activity:
{
"name": "AzureBlobDataset_COMPRESSED_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Month": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Day": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
}
},
"type": "AzureBlob",
"structure": [
{
"name": "tag",
"type": "String"
},
{
"name": "device_id",
"type": "String"
},
{
"name": "system_id",
"type": "String"
},
{
"name": "utc",
"type": "DateTime"
},
{
"name": "ts",
"type": "DateTime"
},
{
"name": "median",
"type": "Double"
},
{
"name": "min",
"type": "Double"
},
{
"name": "max",
"type": "Double"
},
{
"name": "avg",
"type": "Double"
},
{
"name": "stdev",
"type": "Double"
},
{
"name": "first_value",
"type": "Double"
},
{
"name": "last_value",
"type": "Double"
},
{
"name": "message_count",
"type": "Int16"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ";",
"nullValue": "\\N",
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": true
},
"fileName": "",
"folderPath": {
"value": "@concat('d00044653/processed/stream/compressed',dataset().Year,'-',dataset().Month,'-',dataset().Day)",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Output Dataset for Copy Activity:
{
"name": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "TAG",
"type": "String"
},
{
"name": "DEVICE_ID",
"type": "String"
},
{
"name": "SYSTEM_ID",
"type": "String"
},
{
"name": "UTC",
"type": "DateTime"
},
{
"name": "TS",
"type": "DateTime"
},
{
"name": "MEDIAN",
"type": "Decimal"
},
{
"name": "MIN",
"type": "Decimal"
},
{
"name": "MAX",
"type": "Decimal"
},
{
"name": "AVG",
"type": "Decimal"
},
{
"name": "STDEV",
"type": "Decimal"
},
{
"name": "FIRST_VALUE",
"type": "Decimal"
},
{
"name": "LAST_VALUE",
"type": "Decimal"
},
{
"name": "MESSAGE_COUNT",
"type": "Int32"
}
],
"typeProperties": {
"tableName": "[dbo].[T_ASSET_MONITORING_WARM]"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
My problem is that after publishing nothing happens. Any suggestions??
"startTime": "2018-08-17T10:46:00.000Z", "endTime": "2018-11-04T10:46:00.000Z",
It ended on Nov 4? – Bo Xiao