0
votes

I have a Azure Data Factory Copy Activity within a pipeline. The copy activity is working - but the data is copied multiple times. My data source is an Azure NoSQL DB. How do I configure the Copy Activity to Not Recopy a record?

Here is my Activity

{
  "name": "Copy Usage Session Data",
  "properties": 
  {
    "description": "",
    "activities": 
    [
      {
        "type": "Copy",
        "typeProperties": 
        {
          "source": {"type": "DocumentDbCollectionSource"},
          "sink": 
          {
            "type": "SqlSink",
            "writeBatchSize": 0,
            "writeBatchTimeout": "05:00:00",
            "sliceIdentifierColumnName": "InstallationSliceIdentifier"
          },
          "translator": 
          {
            "type": "TabularTranslator",
            "ColumnMappings": "machineKey: machineKey, product: product, softwareVersion: softwareVersion, id: DocumentDBId"
          }

        },
        "inputs": [{"name": "Machine Registration Input Data"}],
        "outputs": [{"name": "Machine Registration Output Data"}],
        "policy": 
        {
          "timeout": "01:00:00",
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst"
        },
        "scheduler": 
        {
          "frequency": "Hour",
          "interval": 1
        },
        "name": "Machine Registration Data To History",
        "description": "Copy Machine Registration Data To SQL Server DB Activity"
      },
      {
        "type": "Copy",
        "typeProperties": 
        {
          "source": {"type": "DocumentDbCollectionSource"},
          "sink": 
          {
            "type": "SqlSink",
            "writeBatchSize": 0,
            "writeBatchTimeout": "05:00:00",
            "sliceIdentifierColumnName": "UsageSessionSliceIdentifier"
          },
          "translator": 
          {
            "type": "TabularTranslator",
            "ColumnMappings": "id: usageSessionId, usageInstallationId: usageInstallationId, startTime: startTime, stopTime: stopTime, currentVersion: currentVersion"
          }
        },
        "inputs": [{"name": "Usage Session Input Data"}],
        "outputs": [{"name": "Usage Session Output Data"}],
        "policy": 
        {
          "timeout": "01:00:00",
          "concurrency": 2,
          "executionPriorityOrder": "OldestFirst"
        },
        "scheduler": 
        {
          "frequency": "Hour",
          "interval": 1
        },
        "name": "Usage Session Data To History",
        "description": "Copy Usage Session Data To SQL Server DB Activity"
      }
    ],
    "start": "2017-05-29T16:15:00Z",
    "end": "2500-01-01T00:00:00Z",
    "isPaused": false,        
    "pipelineMode": "Scheduled"
  }
}

2
Can you share your copy activity json template. Without that, it's hard to provide solution.Venky
I want to add my activity - but to long for a comment - and I can't edit the question. So how do I edit the question - see no edit tag? I read about the sqlReaderQuery option for filtering based on a query technique - where they only show filtering based on the dataset/window time slice. But my source is a NoSQL DB and I read only SQL sources can use this - and on top of that the timestamp I have in the DB collection is offset from 1970/1/1. I can't find clear descriptions for the sqlReaderQuery. Where can I find how to use ?Peter
You need to add query to pick the data slice which you want to copy. Now as you didn't specify query, Both activities are copying the whole data from NoSql to Sink. Hence it looks like copied twice. Use SliceStart, SliceEnd in your query.Venky
I was able to get the copy wizard to go through and it generated a pipeline for review. The one thing that stood out was that the sqlReaderQuery is invalid with DocumentDB -- must use attribute named query. Not sure if this is she standard name now - or specific to DocumentDBPeter
Yes, you need to use query parameter to specifiy the dataslice which you want to copy. You also need to specify nestingSeparator as per DocDB standards. I updated my answer with example below. Hope it helps.Venky

2 Answers

1
votes

Change pipeline start date as current date. If the pipeline start date is in the past then many data slices are created from that date to the current date and they will be copied. Also, you have set Concurrency : 2. This means 2 activities will run at a time.

e.g If your output dataset availability is 1 day and your pipeline start date is 29 - 05 -2017 then until today 16-06-2017 total 18 data slices will be created for each day. If you set the concurrency as 2 then 2 copy activities are run at a time. If Concurrency : 10 then 10 copy activities are run parallel.

Be careful with Output Dataset availability, Pipeline Start Date, Concurrency and Source Query.

example of a source query is $$Text.Format('select * from c where c.ModifiedDate >= \'{0:yyyy-MM-ddTHH:mm:ssZ}\' AND c.ModifiedDate < \'{1:yyyy-MM-ddTHH:mm:ssZ}\'', WindowStart, WindowEnd) Where ModifiedDate is a column which tell the time of document created in that particular collection.

Updated :

{
  "name": "DocDbToBlobPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "DocumentDbCollectionSource",
            "query": "SELECT Person.Id, Person.Name.First AS FirstName, Person.Name.Middle as MiddleName, Person.Name.Last AS LastName FROM Person",
            "nestingSeparator": "."
          },
          "sink": {
            "type": "BlobSink",
            "blobWriterAddHeader": true,
            "writeBatchSize": 1000,
            "writeBatchTimeout": "00:00:59"
          }
        },
        "inputs": [
          {
            "name": "PersonDocumentDbTable"
          }
        ],
        "outputs": [
          {
            "name": "PersonBlobTableOut"
          }
        ],
        "policy": {
          "concurrency": 1
        },
        "name": "CopyFromDocDbToBlob"
      }
    ],
    "start": "2015-04-01T00:00:00Z",
    "end": "2015-04-02T00:00:00Z"
  }
} 

Have a look at Data Factory scheduling and execution

For your Reference

1
votes

You can use the query with created/modified date (it should exists in your table) and only pick the records for current date. This will be provided by slice start or end date and that way you can read only newly created records on daily basis.