0
votes

Update

I think my issue might be a client issue. By closing visual studio 2019 and opening it back up and running, I am able to get a file completely loaded (all 2 million records). However, if I try to run again, it will fail with the issue as described below.

Azure DocumentDB sporadically throws SocketException / GoneException

Original issue

I'm trying to bulk load some data files in cosmos DB. I am using an example i found here:

https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started/tree/master/BulkImportSample

In the import sample, they're making fake documents with a loop. In my case I am reading CSV files from a directory. Each CSV file line will convert to one CosmosDB document.

The default throughput was 400 but I increased that to 10000. I don't need it so high after the import but I can set it to whatever is required for the bulk import task. Despite that, I have some kind of throughput or throttling issue and I am confused about how I figure out what throughput I need to perform this data import. Each of these CSV files contains about 2 million rows but each row only has 10 scalar values on it.

It starts to work. I get some output from BulkExecutorTrace like, "BulkExecutorTrace Information: 0 : Partition index 0 : 2452 | Operated on 2452 docs in 1 seconds at 19342.86 RU/s with 4 tasks. Faced 0 throttles"

But then after 19 rows of output like that, I get:

DocDBTrace Warning: 0 : Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll BulkExecutorTrace Information: 0 : RNTBD call timed out on channel 192.168.11.105:19676 -> 40.78.226.8:14319. Error: ReceiveTimeout Partition index 0 : 22068 | Operated on 1226 docs in 1 seconds at 9652.88 RU/s with 20 tasks. Faced 0 throttles Exception thrown: 'Microsoft.Azure.Documents.TransportException' in Microsoft.Azure.Documents.Client.dll Exception thrown: 'Microsoft.Azure.Documents.TransportException' in mscorlib.dll Exception thrown: 'Microsoft.Azure.Documents.TransportException' in mscorlib.dll DocDBTrace Information: 0 : RequestAsync failed: RID: dbs/Diseases/colls/Diseases/sprocs/__.sys.commonBulkInsert, Resource Type: StoredProcedure, Op: (operationType: ExecuteJavaScript, resourceType: StoredProcedure), Address: rntbd://cdb-ms-prod-eastus1-fd10.documents.azure.com:14319/apps/00e9d5e0-018e-43a2-b5a4-f41c78498cdb/services/61a05d2a-fb30-455f-864e-c9e10e85684c/partitions/92daa841-dcc5-40f0-9c21-2956cda4d2ac/replicas/132323601201339145p/, Exception: Microsoft.Azure.Documents.TransportException: A client transport error occurred: The request timed out while waiting for a server response. (Time: 2020-04-26T18:18:32.9586759Z, activity ID: 80910ba7-8b36-40fa-a3bf-3eac239b00e2, error code: ReceiveTimeout [0x0010], base error: HRESULT 0x80131500, URI: rntbd://cdb-ms-prod-eastus1-fd10.documents.azure.com:14319/apps/00e9d5e0-018e-43a2-b5a4-f41c78498cdb/services/61a05d2a-fb30-455f-864e-c9e10e85684c/partitions/92daa841-dcc5-40f0-9c21-2956cda4d2ac/replicas/132323601201339145p/, connection: 192.168.11.105:19676 -> 40.78.226.8:14319, payload sent: True, CPU history: (2020-04-26T18:18:12.7679827Z 80.069), (2020-04-26T18:18:22.7667638Z 28.038), (2020-04-26T18:18:22.7672671Z 100.000), (2020-04-26T18:18:22.7672671Z 0.000), (2020-04-26T18:18:22.7672671Z 0.000), (2020-04-26T18:18:32.7701961Z 20.629), CPU count: 8) at Microsoft.Azure.Documents.Rntbd.Channel.d__13.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.Documents.Rntbd.LoadBalancingPartition.d__9.MoveNext()

private async Task RunBulkImportAsync()
    {
        DocumentCollection dataCollection = null;

        try
        {
            dataCollection = GetCollectionIfExists(client, DatabaseName, CollectionName);
            if (dataCollection == null)
            {
                throw new Exception("The data collection does not exist");
            }
        }
        catch (Exception de)
        {
            Trace.TraceError("Unable to initialize, exception message: {0}", de.Message);
            throw;
        }

        string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");

        // Set retry options high for initialization (default values).
        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

        IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
        await bulkExecutor.InitializeAsync();

        // Set retries to 0 to pass control to bulk executor.
        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;

        BulkImportResponse bulkImportResponse = null;
        long totalNumberOfDocumentsInserted = 0;
        double totalRequestUnitsConsumed = 0;
        double totalTimeTakenSec = 0;

        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        foreach (string d in Directory.GetDirectories(RootPath).Take(1))
        {

            foreach (string f in Directory.GetFiles(d).Take(1))
            {

                Trace.WriteLine("Processing file, " + f);

                var lines = File.ReadAllLines(f);

                Trace.WriteLine("File has " + lines.Count() + "lines");

                List<RowToImport> dataToImport = lines
                                       .Skip(1)
                                       .Select(v => RowToImport.FromCsv(v))
                                       .ToList();

                List<string> documentsToImportInBatch = dataToImport.Select(dti => GenerateJsonDocument(Guid.NewGuid().ToString(), dti.Disease, dti.Year, dti.Age, dti.Country, dti.CountryName, dti.CohortSize, dti.DeathsCongenital)).ToList();

                // Invoke bulk import API.

                var tasks = new List<Task>();

                tasks.Add(Task.Run(async () =>
                {
                    Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", f));

                    do
                    {
                        try
                        {
                            bulkImportResponse = await bulkExecutor.BulkImportAsync(
                                documents: documentsToImportInBatch,
                                enableUpsert: true,
                                disableAutomaticIdGeneration: true,
                                maxConcurrencyPerPartitionKeyRange: 100,
                                maxInMemorySortingBatchSize: null,
                                cancellationToken: token);
                        }
                        catch (DocumentClientException de)
                        {
                            Trace.TraceError("Document client exception: {0}", de);
                            //Console.WriteLine("Document client exception: {0} {1}", f, de);
                            break;
                        }
                        catch (Exception e)
                        {
                            Trace.TraceError("Exception: {0}", e);
                            //Console.WriteLine("Exception: {0} {1}", f, e);
                            break;
                        }
                    } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count) ;


                    Trace.WriteLine(String.Format("\nSummary for batch {0}:", f));
                    Trace.WriteLine("--------------------------------------------------------------------- ");
                    Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
                        bulkImportResponse.NumberOfDocumentsImported,
                        Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
                        Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
                        bulkImportResponse.TotalTimeTaken.TotalSeconds));
                    Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
                        (bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported)));
                    Trace.WriteLine("---------------------------------------------------------------------\n ");

                    totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
                    totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
                    totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
                },
                token));

                await Task.WhenAll(tasks);

            }

            Trace.WriteLine("Overall summary:");
            Trace.WriteLine("--------------------------------------------------------------------- ");
            Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
                totalNumberOfDocumentsInserted,
                Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
                Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
                totalTimeTakenSec));
            Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
                (totalRequestUnitsConsumed / totalNumberOfDocumentsInserted)));
            Trace.WriteLine("--------------------------------------------------------------------- ");


            Trace.WriteLine("\nPress any key to exit.");
            Console.ReadKey();
        }

    }

On the directories and files loops I am using Take(1) for now just to try to get one file to work. But, in reality there are 4 directories and almost a hundred files in each one.

Any advice you can point me to about how I need to be throttling this thing to get it to import all of this data?

1

1 Answers

0
votes

That exception is not related to throughput. That exception is pointing at a Timeout / Connectivity issue, which is referenced on the SDK Troubleshooting page

In your exception we can see that there is a CPU spike:

CPU history: 
(2020-04-26T18:18:12.7679827Z 80.069), 
(2020-04-26T18:18:22.7667638Z 28.038), 
(2020-04-26T18:18:22.7672671Z 100.000), 
(2020-04-26T18:18:22.7672671Z 0.000), 
(2020-04-26T18:18:22.7672671Z 0.000), 
(2020-04-26T18:18:32.7701961Z 20.629)

This could lead to connectivity issues. If you are running this on a local dev machine, see what other processes might be consuming CPU. If this is running in a VM, it might require a bigger CPU pool.

Also, based on your code, you are using the Bulk Executor from concurrent operations (you are creating multiple Tasks in parallel).

The Bulk Executor performance tips indicate that this should not be done:

Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO (This happens by spawning multiple tasks internally). Avoid spawning multiple concurrent tasks within your application process that execute bulk operation API calls. If a single bulk operation API call that is running on a single virtual machine is unable to consume the entire container's throughput (if your container's throughput > 1 million RU/s), it's preferred to create separate virtual machines to concurrently execute the bulk operation API calls.