I'm having a problem where I have a pipeline that keeps failing when operating over a large dataset. The worker logs don't show any errors other than warnings about a lull in processing that occurs during reshuffle. My suspicion is that I'm killing the workers by exceeding their allocated memory, as the only hint to what is happening is that I can see workers spin up in the logs but then proceed to do nothing. Eventually if I wait long enough or kill the pipeline they fail.
I want to reduce my elements down to just a few concurrently running groups for inserting into elasticsearch. So for example, go from 40 workers doing the processing to just 7 workers doing batch inserts into ES.
I've put windowing between the processing and the elasticsearch insert. I have logging statements, and I can see that in spite of my windowing with AfterCount
, the window sizes seem to be largely unbound. I.E. I set the window size to 1000 and I get a group of 12k. I think the problem is that Apache Beam operates on bundles and only triggers after the bundle has been processed and one of my transforms may generate any number of output elements to the collection.
What is the intended way to accomplish this task?
To visualize what I want to happen:
1000 items / 50 group window -> outputs 500,000+ docs / window -> insert with 7 workers in batches of 2k docs each
The current pipeline flow (I've tried many variations of this):
Read from datastore
| window by timestamp (5 seconds) with early trigger of 100 elements
| group by random int between 0 and 50 (need groups for batch fetch call in processing)
| fetch from service in batch and process with output documents
| window by timestamp (10 seconds) with early trigger of 1000 documents
| group by random int between 0 and X
| insert into ES
where I've tried various values of X. Lower values result in higher insert throughput but the pipeline fails at the insertion step when working with larger amounts of data. Right now trying a run where X=80 and I see moderate throughput but there have been some timeouts and the batch calls which normally take a couple of seconds or less are now taking 15 seconds or more to complete.
Adding more workers to the insert seems to work around the workers failing to do any inserting at all, but the high number of batch requests is highly inefficient and they just end up taking longer to complete and risk timing out and overloading the cluster.
To better phrase things, at this point I'm just flailing and trying different parameters, and surely there must be a way to design the pipeline so this isn't an issue regardless of data or window sizes.