8
votes

Hi I am using 16 collections to insert around 3-4 million json objects ranging from 5-10k per object.I am using stored procedure to insert these documents.I have 22 Capacity Unit.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length;
    if (docsLength == 0) {
        getContext().getResponse().setBody(0);
    }

    // Call the CRUD API to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted. 
    //    In this case the callback will not be called, we just call setBody and we are done.
    // 2) The callback was called docs.length times.
    //    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
    function tryCreateOrUpdate(doc, callback) {
        var isAccepted = true;
        var isFound = collection.queryDocuments(collectionLink, 'SELECT * FROM root r WHERE r.id = "' + doc.id + '"', function (err, feed, options) {
            if (err) throw err;
            if (!feed || !feed.length) {
                isAccepted = collection.createDocument(collectionLink, doc, callback);
            }
            else {
                // The metadata document.
                var existingDoc = feed[0];
                isAccepted = collection.replaceDocument(existingDoc._self, doc, callback);
            }
        });

        // If the request was accepted, callback will be called.
        // Otherwise report current count back to the client, 
        // which will call the script again with remaining set of docs.
        // This condition will happen when this stored procedure has been running too long
        // and is about to get cancelled by the server. This will allow the calling client
        // to resume this batch from the point we got to before isAccepted was set to false
        if (!isFound && !isAccepted) getContext().getResponse().setBody(count);
    }

    // This is called when collection.createDocument is done and the document has been persisted.
    function callback(err, doc, options) {
        if (err) throw err;

        // One more document has been inserted, increment the count.
        count++;

        if (count >= docsLength) {
            // If we have created all documents, we are done. Just set the response.
            getContext().getResponse().setBody(count);
        } else {
            // Create next document.
            tryCreateOrUpdate(docs[count], callback);
        }
    }

my C# codes looks like this

    public async Task<int> Add(List<JobDTO> entities)
            {

                    int currentCount = 0;
                    int documentCount = entities.Count;

                    while(currentCount < documentCount)
                    {
                        string argsJson = JsonConvert.SerializeObject(entities.Skip(currentCount).ToArray());
                        var args = new dynamic[] { JsonConvert.DeserializeObject<dynamic[]>(argsJson) };

                        // 6. execute the batch.
                        StoredProcedureResponse<int> scriptResult = await DocumentDBRepository.Client.ExecuteStoredProcedureAsync<int>(sproc.SelfLink, args);

                        // 7. Prepare for next batch.
                        int currentlyInserted = scriptResult.Response;

                        currentCount += currentlyInserted;

                    }

                    return currentCount;
            }

The problem I am facing is out of 400k documents that I try to insert at times documents get missed with out giving any error.

The application is worker role deployed on cloud. If I increase the number of threads or instances inserting in documentDB the number of documents missed are much higher.

how to figure out what is the problem.Thanks in Advance.

3
Is there anything helpful at this question (talks about exceptions you might get) or this question (gives a code example that you might have seen already)?shoover
yeah I have seen that already .. for some unknown reason Document db skips adding documents when inserting is in bulk...I am talking of million documents here and its is kind of sporadicvarunpathak
could you please email me so that we can dig in to this some more. we would need some details from you such as endpoint name, activity-ids etc.Ryan CrawCour
yes sure I am still looking on different aspects will come back looks like we are on track to resolve it...will post the different changes that I did to over come the issue...varunpathak
here are changes done to my code 1)Instead of just firing stored procedure using task I now use a Wait() to for make sure the taskis completed. 2) I now not only catch Aggregate exception but also Exception 3) Tuned in D2 machines with 2 instances and 16 concurrent calls for Message Options 4) used Document DB Standard S1 with 1:1 collection to Capacity unit Ratio ....Thanks for all the helpvarunpathak

3 Answers

10
votes

I found that when trying this code I would get an error at docs.length which stated that length was undefined.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length; // length is undefined
}

After many tests (could not find anything in Azure documentation) I realized that I could not pass an array as was suggested. The parameter had to be an object. I had to modify the batch code like this in order for it to run.

I also found I could not simply try and pass an array of documents in the DocumentDB script explorer (Input box) either. Even though the placeholder help text says you can.

This code worked for me:

// psuedo object for reference only
docObject = {
  "items": [{doc}, {doc}, {doc}]
}

function bulkImport(docObject) {
    var context = getContext();
    var collection = context.getCollection();
    var collectionLink = collection.getSelfLink();
    var count = 0;

    // Check input
    if (!docObject.items || !docObject.items.length) throw new Error("invalid document input parameter or undefined.");
    var docs = docObject.items;
    var docsLength = docs.length;
    if (docsLength == 0) {
        context.getResponse().setBody(0);
    }

    // Call the funct to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Obviously I have truncated this function. The above code should help you understand what has to change.
}

Hopefully Azure documentation will catch up or become easier to find if I missed it.

I'll also be placing a bug report for the Script Explorer in hopes that the Azurites will update.

4
votes

It’s important to note that stored procedures have bounded execution, in which all operations must complete within the server specified request timeout duration. If an operation does not complete with that time limit, the transaction is automatically rolled back. In order to simplify development to handle time limits, all CRUD (Create, Read, Update, and Delete) operations return a Boolean value that represents whether that operation will complete. This Boolean value can be used a signal to wrap up execution and for implementing a continuation based model to resume execution (this is illustrated in our code samples below).

The bulk-insert stored procedure provided above implements the continuation model by returning the number of documents successfully created. This is noted in the stored procedure's comments:

    // If the request was accepted, callback will be called.
    // Otherwise report current count back to the client, 
    // which will call the script again with remaining set of docs.
    // This condition will happen when this stored procedure has been running too long
    // and is about to get cancelled by the server. This will allow the calling client
    // to resume this batch from the point we got to before isAccepted was set to false
    if (!isFound && !isAccepted) getContext().getResponse().setBody(count);

If the output document count is less than the input document count, you will need to re-run the stored procedure with the remaining set of documents.

2
votes

Since May 2018 there is a new Batch SDK for Cosmos DB. There is a GitHub repo to get you started.

I have been able to import 100.000 records in 9 seconds. And using Azure Batch to fan out the inserts, I have done 19 mln records in 1m15s. This was on a 1.66mln RU/s collection, which you obviously can scale down after import.