I am trying to copy a large amount of data (over 70 million rows) from 1 table to another. I am having trouble because .set-or-append is timing out even when I microbatch manually. I am willing to bet there is some microbatch size in terms of rows that will allow the .set-or-append to succeed, but my problem is programatically delegating these microbatches to the .set-or-append command. This is what I have so far:
.set-or-append async MyTargetTable <|
let relevantIds= SourceTableName
| summarize innerCount=count() by __id
| where innerCount >= 5000
| top 10000 by innerCount desc;
SourceTableName
| where __id in (relevantIds)
| extend bucket = hash(row_number(), 10)
| partition by bucket
// unsure from here
The hashing function in kusto is great. I ran a basic query by ranging between 1...70 million in and seeing what the distribution of the hash function was for 10 buckets, and it was roughly 7 million each. I would like to utilize this function in order to create my partitioning. Essentially, this allows my microbatches to be roughly evenly sized, and then I can easily programatically increase the number of buckets if the microbatches fail.
My problem is that the partition by operator requires a subquery, and you cannot pass .set-or-append as a subquery to partition by. So I am unsure how I can utilize partition by operator with .set-or-append.
How can I split my data into 10 buckets (or N buckets) of roughly equal size (based on number of rows) and send that programitically to .set-or-append?
One more thing: since the .set-or-append returns an operation id, it would be great to get a list of operation ids for the microbatches that were queued up
EDIT: I saw a related question and tried to model my query after his, would something like this work?:
.set-or-append TargetTable <|
let _Scope = () {
let relevantIds= SourceTable
| summarize innerCount=count() by __id
| where innerCount >= 5000
| top 10000 by innerCount desc;
let cachedIds = materialize(relevantIds);
let BUCKETS = 10;
range bucket from 0 to BUCKETS - 1 step 1
| partition by bucket
{
SourceTable
| where __id in (cacheIds)
| where hash(row_number(), BUCKETS) == toscalar(bucket)
}
};
union (_Scope())
what do you think? would this queue up 10 async ingestion operations partitioned by row number?