0
votes

I'm currently running a script in python SDK which programmatically bulk upserts 1.5 million documents into a collection in azure cosmos db. I've been using the bulk import sproc from the samples provided in the github repo: https://github.com/Azure/azure-cosmosdb-js-server/tree/master/samples/stored-procedures, the only change being that I've swapped collection.createDocument with collection.upsertDocument. I'll include my sproc in full below.

The stored procedure does run successfully - it upserts documents consistently and relatively quickly. Although this will be the case only up until around 30% progress when this error will be thrown:

CosmosHttpResponseError: (RequestTimeout) Message: {"Errors":["The requested operation exceeded maximum alloted time. Learn more: https://aka.ms/cosmosdb-tsg-service-request-timeout"]}
ActivityId: 9f2357c6-918c-4b67-ba20-569034bfde6f, Request URI: /apps/4a997bdb-7123-485a-9808-f952db2b7e52/services/a7c137c6-96b8-4b53-a20c-b9577981b353/partitions/305a8287-11d1-43f8-be1f-983bd4c4a63d/replicas/132488328092882514p/, RequestStats:
RequestStartTime: 2020-11-03T23:43:59.9158203Z, RequestEndTime: 2020-11-03T23:44:05.3858559Z, Number of regions attempted:1
ResponseTime: 2020-11-03T23:44:05.3858559Z, StoreResult: StorePhysicalAddress: rntbd://cdb-ms-prod-centralus1-fd22.documents.azure.com:14354/apps/4a997bdb-7123-485a-9808-f952db2b7e52/services/a7c137c6-96b8-4b53-a20c-b9577981b353/partitions/305a8287-11d1-43f8-be1f-983bd4c4a63d/replicas/132488328092882514p/, LSN: -1, GlobalCommittedLsn: -1, PartitionKeyRangeId: , IsValid: False, StatusCode: 408, SubStatusCode: 0, RequestCharge: 0, ItemLSN: -1, SessionToken: , UsingLocalLSN: False, TransportException: null, ResourceType: StoredProcedure, OperationType: ExecuteJavaScript, SDK: Microsoft.Azure.Documents.Common/2.11.0

Is there a way to add some retry logic or to extend the timeout period for bulk upserts? I believe the section of code in the sproc below if (!isAccepted) getContext().getResponse().setBody(count); is supposed to help with this scenario but it doesn't seem to work in my case.

Bulk upsert stored procedure in Javascript:

function bulkUpsert(docs) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();

// The count of imported docs, also used as current doc index.
var count = 0;

// Validate input.
if (!docs) throw new Error("The array is undefined or null.");

var docsLength = docs.length;
if (docsLength == 0) {
    getContext().getResponse().setBody(0);
    return;
}

// Call the CRUD API to create a document.
tryCreate(docs[count], callback);

// Note that there are 2 exit conditions:
// 1) The upsertDocument request was not accepted. 
//    In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
//    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback) {
    var isAccepted = collection.upsertDocument(collectionLink, doc, callback);

    // If the request was accepted, callback will be called.
    // Otherwise report current count back to the client, 
    // which will call the script again with remaining set of docs.
    // This condition will happen when this stored procedure has been running too long
    // and is about to get cancelled by the server. This will allow the calling client
    // to resume this batch from the point we got to before isAccepted was set to false
    if (!isAccepted) {
        getContext().getResponse().setBody(count);
    }
}

// This is called when collection.upsertDocument is done and the document has been persisted.
function callback(err, doc, options) {
    if (err) throw err;

    // One more document has been inserted, increment the count.
    count++;

    if (count >= docsLength) {
        // If we have created all documents, we are done. Just set the response.
        getContext().getResponse().setBody(count);
    } else {
        // Create next document.
        tryCreate(docs[count], callback);
    }
}
}

I think that the problem may lie in the stored procedure rather than the python script, if this isn't the case though I can provide my python script. Any help on this would be massively appreciated, it's been a head scratcher for me for days now!

Extra Info:

Throughput = 10,000, partition upsert size ~ 1.9MB consistently.

2

2 Answers

0
votes

Stored procedures have a bounded execution time of 5 seconds. However you can write your stored procedure to handle bounded execution by checking a boolean return value and then use the count of items inserted in each invocation of the stored procedure to track and resume progress across batches. There is an example here.

0
votes

If anyone else has this problem, the workaround I've used is to increase the throughput to 100,000 instead of 10,000 temporarily whilst the bulk upsert operation is underway. The error doesn't occur if you use that bulk upsert stored procedure in conjunction with a sufficiently high throughput. I think the timeout was happening frequently once the bulk upsert operation had upserted around 30% of the 1.5 million records, likely because the throughput wasn't divided sufficiently between partitions and it was causing a bottleneck. I may have to again assign a greater throughput to my container once it is used in practice or maybe I'll be able to reduce it to save costs. Either way the code to do this is quite simple with just the method below:

new_throughput = 10000; container.replace_throughput(new_throughput)