We have a dataflow streaming job which reads from PubSub, extracts some fields and writes to bigtable. We are observing that dataflow's throughput drops when it is autoscaling. For example, if the dataflow job is currently running with 2 workers and processing at the rate of 100 messages/sec, during autoscaling this rate of 100 messages/sec drops down and some times it drops down to nearly 0 and then increases to 500 messages/sec. We are seeing this every time, dataflow upscales. This is causing higher system lag during autoscaling and bigger spikes of unacknowledged messages in pub/sub.
Is this the expected behavior of dataflow autoscaling or is there a way to maintain this 100 messages/sec while it autoscales and minimize the spices of unacknowledged messages? (Please Note: 100 messages/sec and 500 messages/sec are just example figures)
job ID: 2017-10-23_12_29_09-11538967430775949506
I am attaching the screen shots of pub/sub stackdriver and dataflow autoscaling.
There is drop in number of pull requests everytime dataflow autoscales. I could not take screenshot with timestamps, but drop in pull requests matches with the time data flow autoscaling. ===========EDIT========================
We are writing to GCS in parallel using below mentioned windowing.
inputCollection.apply("Windowing",
Window.<String>into(FixedWindows.of(ONE_MINUTE))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(ONE_MINUTE))
.withAllowedLateness(ONE_HOUR)
.discardingFiredPanes()
)
//Writing to GCS
.apply(TextIO.write()
.withWindowedWrites()
.withNumShards(10)
.to(options.getOutputPath())
.withFilenamePolicy(
new
WindowedFileNames(options.getOutputPath())));
WindowedFileNames.java
public class WindowedFileNames extends FilenamePolicy implements OrangeStreamConstants{
/**
*
*/
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(WindowedFileNames.class);
protected final String outputPath;
public WindowedFileNames(String outputPath) {
this.outputPath = outputPath;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow intervalWindow = (IntervalWindow) context.getWindow();
DateTime date = intervalWindow.maxTimestamp().toDateTime(DateTimeZone.forID("America/New_York"));
String fileName = String.format(FOLDER_EXPR, outputPath, //"orangestreaming",
DAY_FORMAT.print(date), HOUR_MIN_FORMAT.print(date) + HYPHEN + context.getShardNumber());
logger.error(fileName+"::::: File name for the current minute");
return outputDirectory
.getCurrentDirectory()
.resolve(fileName, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
return null;
}
}