0
votes

I am tring to run google dataflow for a simulatio process. I tried to initiate dataflow by max worker nodes 5 with auto-scaling on (THROUGHPUT_BASED).

Issue is that dataflow is not utilizing all the worker nodes (5) ad giving me message.

Autoscaling: Reduced the number of workers to 1 based on the ability to parallelize the work in the currently running step(s).

Please suggest what can be the issue.

Logs are attached below.

2017-05-10T06:01:36.555Z: Detail: (5de468ab73bd7581): Autoscaling is enabled for job 2017-05-09_23_01_36-6765647625807820060. The number of workers will be between 1 and 1000. 2017-05-10T06:01:37.811Z: Detail: (feb335244b957ccc): Checking required Cloud APIs are enabled. 2017-05-10T06:02:05.328Z: Detail: (feb335244b9573fd): Expanding GroupByKey operations into optimizable parts. 2017-05-10T06:02:05.332Z: Detail: (feb335244b957017): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns 2017-05-10T06:02:05.714Z: Detail: (feb335244b957a72): Fusing adjacent ParDo, Read, Write, and Flatten operations 2017-05-10T06:02:05.716Z: Detail: (feb335244b95768c): Fusing consumer AnonymousParDo into Read/DataflowPipelineRunner.BatchBigQueryIONativeRead 2017-05-10T06:02:05.718Z: Detail: (feb335244b9572a6): Fusing consumer WriteToBigQuery/DataflowPipelineRunner.BatchBigQueryIOWrite/DataflowPipelineRunner.BatchBigQueryIONativeWrite into AnonymousParDo 2017-05-10T06:02:05.728Z: Detail: (feb335244b95730e): Adding StepResource setup and teardown to workflow graph. 2017-05-10T06:02:05.767Z: Basic: (75edc98e2ecf8a50): Executing operation Read/DataflowPipelineRunner.BatchBigQueryIONativeRead+AnonymousParDo+WriteToBigQuery/DataflowPipelineRunner.BatchBigQueryIOWrite/DataflowPipelineRunner.BatchBigQueryIONativeWrite 2017-05-10T06:02:05.973Z: Basic: (c9b9bc749d188107): Starting 5 workers... 2017-05-10T06:02:07.735Z: Basic: (4903a2f536f5c1ae): BigQuery query issued as job: "dataflow_job_4354447435307355184". You can check its status with the bq tool: "bq show -j --project_id=com-dl-parts-dev dataflow_job_4354447435307355184". 2017-05-10T06:03:03.727Z: Detail: (376dc1d89cc4c16e): Workers have started successfully. 2017-05-10T06:04:09.561Z: Basic: (4903a2f536f5c868): BigQuery query completed, job : "dataflow_job_4354447435307355184" 2017-05-10T06:04:09.987Z: Basic: (4903a2f536f5cfbf): BigQuery export job "dataflow_job_5261227963771635305" started. You can check its status with the bq tool: "bq show -j --project_id=com-dl-parts-dev dataflow_job_5261227963771635305". 2017-05-10T06:05:10.798Z: Detail: (d0c3fbda1e314661): BigQuery export job progress: "dataflow_job_5261227963771635305" observed total of 1 exported files thus far. 2017-05-10T06:05:10.801Z: Basic: (d0c3fbda1e31498f): BigQuery export job finished: "dataflow_job_5261227963771635305" 2017-05-10T06:06:12.303Z: Basic: (d51372b306784d58): Autoscaling: Resized worker pool from 5 to 1. 2017-05-10T06:06:12.306Z: Detail: (d51372b306784f56): Autoscaling: Reduced the number of workers to 1 based on the rate of progress in the currently running step(s). 2017-05-10T06:07:38.000Z: Basic: (4e3c32839262218d): Autoscaling: Attempting to upscale worker pool to 2. 2017-05-10T06:08:11.074Z: Basic: (4e3c328392622142): Autoscaling: Resized worker pool from 1 to 2. 2017-05-10T06:08:11.077Z: Detail: (4e3c32839262213c): Autoscaling: Raised the number of workers to 2 based on the ability to parallelize the work in the currently running step(s). 2017-05-10T06:13:41.023Z: Basic: (ae82479176c127a3): Autoscaling: Resized worker pool from 2 to 1. 2017-05-10T06:13:41.027Z: Detail: (ae82479176c129d5): Autoscaling: Reduced the number of workers to 1 based on the ability to parallelize the work in the currently running step(s). 2017-05-10T06:20:31.844Z: Basic: (a41dfc71af8c36fd): Executing BigQuery import job "dataflow_job_4354447435307352182". You can check its status with the bq tool: "bq show -j --project_id=com-dl-parts-dev dataflow_job_4354447435307352182".

Update:- Is it right way to prevent fusion?

I am reading a Bigquery table using bigquery IO. This input parameter gives me product number in each record.

After I am doing a ParDo operation. Within process element function I am doing some data forecast operations for each product that I am getting from input.

PCollection<TableRow> quotes3 = quotes2.apply(ParDo.of(new  DoFn<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
    @Override
    public void processElement(ProcessContext c) throws Exception{

        TableRow rowData = c.element();
        rowData = c.element();
        TableRow tableRowData = ForcastOperation(rowData);
        c.output(tableRowDRP);
    }
}));

In last step I am dumping the forcast result into bigquery using dataflow pipeline.

quotes3.apply(BigQueryIO.Write
        .named("WriteToBigQuery")
        .to("com-dl-parts:ds_parts.output_data")
        .withSchema(schema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

pipelineTransform.run();

Update :- 17/05/2017 13:38

I am tring to break the fusion by below listed way. It scaled up the GroupByKey apply operation to 308 nodes. but I am not sure it is scaling up the secound pardo having ForcastOperation method to 308 nodes.

 PCollection<String> quotes1 = quotes.apply(ParDo.of(new  DoFn<TableRow, KV<String, String>>() {
    private static final long serialVersionUID = 1L;
    private Random random = new Random();

        @Override
        public void processElement(ProcessContext c) throws Exception{
            TableRow rowData = c.element();
            rowData = c.element();
            c.output(KV.of(rowData.get("APRODUCT").toString(), rowData.get("APRODUCT").toString()));
        }
        })).apply(GroupByKey.<String, String>create())
           .apply(Values.<Iterable<String>>create())
           .apply(Flatten.<String>iterables());



PCollection<TableRow> quotes3 = quotes1.apply(ParDo.of(new  DoFn<String, TableRow>() {
private static final long serialVersionUID = 1L;
    @Override
    public void processElement(ProcessContext c) throws Exception{

        TableRow rowData = c.element();
        rowData = c.element();
        TableRow tableRowData = ForcastOperation(rowData);
        c.output(tableRowDRP);
    }
}));

In last step I am dumping the forcast result into bigquery using dataflow pipeline.

    quotes3.apply(BigQueryIO.Write
        .named("WriteToBigQuery")
        .to("com-dl-parts:ds_parts.output_data")
        .withSchema(schema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

pipelineTransform.run();
1
Please provide insights into how the pipeline is structured. Post that we can help you identify issues (if any) within the step so that your job can be massively parallelised!Paritosh
What exactly is the problem/question? With autoscaling on, you're telling Dataflow to pick the best number of works that it deems fit for the amount of data it needs to process. This is a normal message to receive as workers come in/out of the pool. In your case, Dataflow made the decision to run with just 1 worker. I'm guessing because the data size/volume isn't that big.Graham Polley
Hi Graham, My requirement is to run 1000 nodes in parallel for each product that I am fetching form first query in quotes2 dataflow pipeline. I assumption is that 1000 nodes should process pardo first 1000 product in parallel and so no. Is there any way to achieve this?Manoj Kumar
I still don't get it. Sorry.Graham Polley

1 Answers

3
votes

I looked at the logs of this job and it appears that the amount of data it has read from BigQuery as input is extremely small - on the order of 1kb. Is this expected?

If yes, and if you still want to parallelize processing of your 1kb 1000-ways, then I assume your ForecastOperation function is extremely computationally intensive per element.

In that case you need to break fusion between reading from BigQuery and applying ForecastOperation. Please see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion