0
votes

I am trying to copy a large amount of data (over 70 million rows) from 1 table to another. I am having trouble because .set-or-append is timing out even when I microbatch manually. I am willing to bet there is some microbatch size in terms of rows that will allow the .set-or-append to succeed, but my problem is programatically delegating these microbatches to the .set-or-append command. This is what I have so far:

.set-or-append async MyTargetTable <|
let relevantIds= SourceTableName
| summarize innerCount=count() by __id
| where innerCount >= 5000
| top 10000 by innerCount desc;
SourceTableName
| where __id in (relevantIds)
| extend bucket = hash(row_number(), 10)
| partition by bucket
// unsure from here

The hashing function in kusto is great. I ran a basic query by ranging between 1...70 million in and seeing what the distribution of the hash function was for 10 buckets, and it was roughly 7 million each. I would like to utilize this function in order to create my partitioning. Essentially, this allows my microbatches to be roughly evenly sized, and then I can easily programatically increase the number of buckets if the microbatches fail.

My problem is that the partition by operator requires a subquery, and you cannot pass .set-or-append as a subquery to partition by. So I am unsure how I can utilize partition by operator with .set-or-append.

How can I split my data into 10 buckets (or N buckets) of roughly equal size (based on number of rows) and send that programitically to .set-or-append?

One more thing: since the .set-or-append returns an operation id, it would be great to get a list of operation ids for the microbatches that were queued up

EDIT: I saw a related question and tried to model my query after his, would something like this work?:

.set-or-append TargetTable <|
let _Scope = () {
    let relevantIds= SourceTable
    | summarize innerCount=count() by __id
    | where innerCount >= 5000
    | top 10000 by innerCount desc;
    let cachedIds = materialize(relevantIds);
    let BUCKETS = 10;
    range bucket from 0 to BUCKETS - 1 step 1
    | partition by bucket
    {
        SourceTable
        | where __id in (cacheIds)
        | where hash(row_number(), BUCKETS) == toscalar(bucket)
    }
};
union (_Scope())

what do you think? would this queue up 10 async ingestion operations partitioned by row number?

1

1 Answers

0
votes

as you mentioned correctly - you can't invoke a control command (e.g. .set-or-append) from a query (regardless of whether or not that query uses the partition operator).

in order to copy large tables you could either:

  1. export the data to cloud storage, then ingest it from blobs using the API / tools like ADF / LightIngest

  2. orchestrate a set of .set-or-append commands, where each handles a subset of the source data, up to ~1M records per command

  • you could 'partition' the source data set using any filter that makes sense WRT your data set.
  • compared to option 1, this option is somewhat more complex.