1
votes

I'm having a problem where I have a pipeline that keeps failing when operating over a large dataset. The worker logs don't show any errors other than warnings about a lull in processing that occurs during reshuffle. My suspicion is that I'm killing the workers by exceeding their allocated memory, as the only hint to what is happening is that I can see workers spin up in the logs but then proceed to do nothing. Eventually if I wait long enough or kill the pipeline they fail.

I want to reduce my elements down to just a few concurrently running groups for inserting into elasticsearch. So for example, go from 40 workers doing the processing to just 7 workers doing batch inserts into ES.

I've put windowing between the processing and the elasticsearch insert. I have logging statements, and I can see that in spite of my windowing with AfterCount, the window sizes seem to be largely unbound. I.E. I set the window size to 1000 and I get a group of 12k. I think the problem is that Apache Beam operates on bundles and only triggers after the bundle has been processed and one of my transforms may generate any number of output elements to the collection.

What is the intended way to accomplish this task?

To visualize what I want to happen:

1000 items / 50 group window -> outputs 500,000+ docs / window -> insert with 7 workers in batches of 2k docs each

The current pipeline flow (I've tried many variations of this):

Read from datastore
| window by timestamp (5 seconds) with early trigger of 100 elements
| group by random int between 0 and 50 (need groups for batch fetch call in processing)
| fetch from service in batch and process with output documents
| window by timestamp (10 seconds) with early trigger of 1000 documents
| group by random int between 0 and X
| insert into ES

where I've tried various values of X. Lower values result in higher insert throughput but the pipeline fails at the insertion step when working with larger amounts of data. Right now trying a run where X=80 and I see moderate throughput but there have been some timeouts and the batch calls which normally take a couple of seconds or less are now taking 15 seconds or more to complete.

Adding more workers to the insert seems to work around the workers failing to do any inserting at all, but the high number of batch requests is highly inefficient and they just end up taking longer to complete and risk timing out and overloading the cluster.

To better phrase things, at this point I'm just flailing and trying different parameters, and surely there must be a way to design the pipeline so this isn't an issue regardless of data or window sizes.

1
Great, in classic stackoverflow fashion, I post then then realize for the past 10 hours the batch pipeline I was deploying was missing the second windowing entirely because I forgot to update the code to pass the parameter needed to add it to the pipeline (can't use the second windowing in a streaming pipeline). Will update on this...James Hutchison
Ok re-added the window that occurs prior to ES with a shuffle before it and... it just stopped working at the insertion step, no errors. Logs show workers spinning up and now there's just a time cap with nothing. Workers downscaled to 1James Hutchison
After working with this I was able to determine that the workers mysteriously fail if a processing group has too much data associated with it. The solution is to create more groups, but if you want to limit concurrency this becomes an issue and will need to set max workers in a batch pipeline to the smallest level of concurrency you want to support anywhere in your pipeline.James Hutchison

1 Answers

2
votes

I'm not sure what the root cause is for the workers silently failing, but the reason that your windows aren't being bound when you use the AfterCount trigger is because triggers only work in streaming pipelines. Since your pipeline is a batch pipeline, the count gets ignored.

The solution is to avoid using windowing, triggers, and group-by to batch elements together, and instead replace it with the BatchElements transform, which seems to do exactly what you need. With that, your pipeline will look like this:

Read from datastore
| batch into X elements
| fetch from service in batch and process with output documents
| batch into Y documents
| insert into ES