0
votes

We have a dataflow streaming job which reads from PubSub, extracts some fields and writes to bigtable. We are observing that dataflow's throughput drops when it is autoscaling. For example, if the dataflow job is currently running with 2 workers and processing at the rate of 100 messages/sec, during autoscaling this rate of 100 messages/sec drops down and some times it drops down to nearly 0 and then increases to 500 messages/sec. We are seeing this every time, dataflow upscales. This is causing higher system lag during autoscaling and bigger spikes of unacknowledged messages in pub/sub.

Is this the expected behavior of dataflow autoscaling or is there a way to maintain this 100 messages/sec while it autoscales and minimize the spices of unacknowledged messages? (Please Note: 100 messages/sec and 500 messages/sec are just example figures)

job ID: 2017-10-23_12_29_09-11538967430775949506

I am attaching the screen shots of pub/sub stackdriver and dataflow autoscaling.enter image description here

enter image description here

enter image description here

There is drop in number of pull requests everytime dataflow autoscales. I could not take screenshot with timestamps, but drop in pull requests matches with the time data flow autoscaling. ===========EDIT========================

We are writing to GCS in parallel using below mentioned windowing.

inputCollection.apply("Windowing",
            Window.<String>into(FixedWindows.of(ONE_MINUTE))

.triggering(AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(ONE_MINUTE))
                  .withAllowedLateness(ONE_HOUR)
                  .discardingFiredPanes()
                  )

    //Writing to GCS
                .apply(TextIO.write()
                            .withWindowedWrites()
                            .withNumShards(10)
                            .to(options.getOutputPath())
                            .withFilenamePolicy(
                                    new 
WindowedFileNames(options.getOutputPath())));

WindowedFileNames.java

public class WindowedFileNames  extends FilenamePolicy implements OrangeStreamConstants{

/**
 * 
 */
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(WindowedFileNames.class);
protected final String outputPath;
public WindowedFileNames(String outputPath) {
    this.outputPath = outputPath;
}


@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
    IntervalWindow intervalWindow = (IntervalWindow) context.getWindow();
    DateTime date = intervalWindow.maxTimestamp().toDateTime(DateTimeZone.forID("America/New_York"));

    String fileName = String.format(FOLDER_EXPR, outputPath, //"orangestreaming", 
            DAY_FORMAT.print(date), HOUR_MIN_FORMAT.print(date) + HYPHEN + context.getShardNumber());
    logger.error(fileName+"::::: File name for the current minute");
     return outputDirectory
              .getCurrentDirectory()
              .resolve(fileName, StandardResolveOptions.RESOLVE_FILE);
}

@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
    return null;
}





}
1
taking a look at thisPablo

1 Answers

0
votes

What is actually happening is that your throughput is decreasing first, and that is the reason that workers are scaling up.

If you look at your pipeline around 1:30am, the series of events is like so:

  1. Around 1:23am, throughput drops. This builds up the backlog.
  2. Around 1:28am, the pipeline unblocks and starts making progress.
  3. Due to the large backlog, the pipeline scales up to 30 workers.

Also, if you look at the autoscaling UI, the justification for going up to 30 workers is:

"Raised the number of workers to 30 so that the pipeline can catch up with its backlog and keep up with its input rate."

Hope that helps!