Update
I think my issue might be a client issue. By closing visual studio 2019 and opening it back up and running, I am able to get a file completely loaded (all 2 million records). However, if I try to run again, it will fail with the issue as described below.
Azure DocumentDB sporadically throws SocketException / GoneException
Original issue
I'm trying to bulk load some data files in cosmos DB. I am using an example i found here:
In the import sample, they're making fake documents with a loop. In my case I am reading CSV files from a directory. Each CSV file line will convert to one CosmosDB document.
The default throughput was 400 but I increased that to 10000. I don't need it so high after the import but I can set it to whatever is required for the bulk import task. Despite that, I have some kind of throughput or throttling issue and I am confused about how I figure out what throughput I need to perform this data import. Each of these CSV files contains about 2 million rows but each row only has 10 scalar values on it.
It starts to work. I get some output from BulkExecutorTrace like, "BulkExecutorTrace Information: 0 : Partition index 0 : 2452 | Operated on 2452 docs in 1 seconds at 19342.86 RU/s with 4 tasks. Faced 0 throttles"
But then after 19 rows of output like that, I get:
DocDBTrace Warning: 0 : Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll BulkExecutorTrace Information: 0 : RNTBD call timed out on channel 192.168.11.105:19676 -> 40.78.226.8:14319. Error: ReceiveTimeout Partition index 0 : 22068 | Operated on 1226 docs in 1 seconds at 9652.88 RU/s with 20 tasks. Faced 0 throttles Exception thrown: 'Microsoft.Azure.Documents.TransportException' in Microsoft.Azure.Documents.Client.dll Exception thrown: 'Microsoft.Azure.Documents.TransportException' in mscorlib.dll Exception thrown: 'Microsoft.Azure.Documents.TransportException' in mscorlib.dll DocDBTrace Information: 0 : RequestAsync failed: RID: dbs/Diseases/colls/Diseases/sprocs/__.sys.commonBulkInsert, Resource Type: StoredProcedure, Op: (operationType: ExecuteJavaScript, resourceType: StoredProcedure), Address: rntbd://cdb-ms-prod-eastus1-fd10.documents.azure.com:14319/apps/00e9d5e0-018e-43a2-b5a4-f41c78498cdb/services/61a05d2a-fb30-455f-864e-c9e10e85684c/partitions/92daa841-dcc5-40f0-9c21-2956cda4d2ac/replicas/132323601201339145p/, Exception: Microsoft.Azure.Documents.TransportException: A client transport error occurred: The request timed out while waiting for a server response. (Time: 2020-04-26T18:18:32.9586759Z, activity ID: 80910ba7-8b36-40fa-a3bf-3eac239b00e2, error code: ReceiveTimeout [0x0010], base error: HRESULT 0x80131500, URI: rntbd://cdb-ms-prod-eastus1-fd10.documents.azure.com:14319/apps/00e9d5e0-018e-43a2-b5a4-f41c78498cdb/services/61a05d2a-fb30-455f-864e-c9e10e85684c/partitions/92daa841-dcc5-40f0-9c21-2956cda4d2ac/replicas/132323601201339145p/, connection: 192.168.11.105:19676 -> 40.78.226.8:14319, payload sent: True, CPU history: (2020-04-26T18:18:12.7679827Z 80.069), (2020-04-26T18:18:22.7667638Z 28.038), (2020-04-26T18:18:22.7672671Z 100.000), (2020-04-26T18:18:22.7672671Z 0.000), (2020-04-26T18:18:22.7672671Z 0.000), (2020-04-26T18:18:32.7701961Z 20.629), CPU count: 8) at Microsoft.Azure.Documents.Rntbd.Channel.d__13.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.Documents.Rntbd.LoadBalancingPartition.d__9.MoveNext()
private async Task RunBulkImportAsync()
{
DocumentCollection dataCollection = null;
try
{
dataCollection = GetCollectionIfExists(client, DatabaseName, CollectionName);
if (dataCollection == null)
{
throw new Exception("The data collection does not exist");
}
}
catch (Exception de)
{
Trace.TraceError("Unable to initialize, exception message: {0}", de.Message);
throw;
}
string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");
// Set retry options high for initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
BulkImportResponse bulkImportResponse = null;
long totalNumberOfDocumentsInserted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
foreach (string d in Directory.GetDirectories(RootPath).Take(1))
{
foreach (string f in Directory.GetFiles(d).Take(1))
{
Trace.WriteLine("Processing file, " + f);
var lines = File.ReadAllLines(f);
Trace.WriteLine("File has " + lines.Count() + "lines");
List<RowToImport> dataToImport = lines
.Skip(1)
.Select(v => RowToImport.FromCsv(v))
.ToList();
List<string> documentsToImportInBatch = dataToImport.Select(dti => GenerateJsonDocument(Guid.NewGuid().ToString(), dti.Disease, dti.Year, dti.Age, dti.Country, dti.CountryName, dti.CohortSize, dti.DeathsCongenital)).ToList();
// Invoke bulk import API.
var tasks = new List<Task>();
tasks.Add(Task.Run(async () =>
{
Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", f));
do
{
try
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: 100,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
}
catch (DocumentClientException de)
{
Trace.TraceError("Document client exception: {0}", de);
//Console.WriteLine("Document client exception: {0} {1}", f, de);
break;
}
catch (Exception e)
{
Trace.TraceError("Exception: {0}", e);
//Console.WriteLine("Exception: {0} {1}", f, e);
break;
}
} while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count) ;
Trace.WriteLine(String.Format("\nSummary for batch {0}:", f));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
bulkImportResponse.NumberOfDocumentsImported,
Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
bulkImportResponse.TotalTimeTaken.TotalSeconds));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported)));
Trace.WriteLine("---------------------------------------------------------------------\n ");
totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
},
token));
await Task.WhenAll(tasks);
}
Trace.WriteLine("Overall summary:");
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
totalNumberOfDocumentsInserted,
Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
totalTimeTakenSec));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(totalRequestUnitsConsumed / totalNumberOfDocumentsInserted)));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine("\nPress any key to exit.");
Console.ReadKey();
}
}
On the directories and files loops I am using Take(1) for now just to try to get one file to work. But, in reality there are 4 directories and almost a hundred files in each one.
Any advice you can point me to about how I need to be throttling this thing to get it to import all of this data?