I am tring to run google dataflow for a simulatio process. I tried to initiate dataflow by max worker nodes 5 with auto-scaling on (THROUGHPUT_BASED).
Issue is that dataflow is not utilizing all the worker nodes (5) ad giving me message.
Autoscaling: Reduced the number of workers to 1 based on the ability to parallelize the work in the currently running step(s).
Please suggest what can be the issue.
Logs are attached below.
2017-05-10T06:01:36.555Z: Detail: (5de468ab73bd7581): Autoscaling is enabled for job 2017-05-09_23_01_36-6765647625807820060. The number of workers will be between 1 and 1000. 2017-05-10T06:01:37.811Z: Detail: (feb335244b957ccc): Checking required Cloud APIs are enabled. 2017-05-10T06:02:05.328Z: Detail: (feb335244b9573fd): Expanding GroupByKey operations into optimizable parts. 2017-05-10T06:02:05.332Z: Detail: (feb335244b957017): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns 2017-05-10T06:02:05.714Z: Detail: (feb335244b957a72): Fusing adjacent ParDo, Read, Write, and Flatten operations 2017-05-10T06:02:05.716Z: Detail: (feb335244b95768c): Fusing consumer AnonymousParDo into Read/DataflowPipelineRunner.BatchBigQueryIONativeRead 2017-05-10T06:02:05.718Z: Detail: (feb335244b9572a6): Fusing consumer WriteToBigQuery/DataflowPipelineRunner.BatchBigQueryIOWrite/DataflowPipelineRunner.BatchBigQueryIONativeWrite into AnonymousParDo 2017-05-10T06:02:05.728Z: Detail: (feb335244b95730e): Adding StepResource setup and teardown to workflow graph. 2017-05-10T06:02:05.767Z: Basic: (75edc98e2ecf8a50): Executing operation Read/DataflowPipelineRunner.BatchBigQueryIONativeRead+AnonymousParDo+WriteToBigQuery/DataflowPipelineRunner.BatchBigQueryIOWrite/DataflowPipelineRunner.BatchBigQueryIONativeWrite 2017-05-10T06:02:05.973Z: Basic: (c9b9bc749d188107): Starting 5 workers... 2017-05-10T06:02:07.735Z: Basic: (4903a2f536f5c1ae): BigQuery query issued as job: "dataflow_job_4354447435307355184". You can check its status with the bq tool: "bq show -j --project_id=com-dl-parts-dev dataflow_job_4354447435307355184". 2017-05-10T06:03:03.727Z: Detail: (376dc1d89cc4c16e): Workers have started successfully. 2017-05-10T06:04:09.561Z: Basic: (4903a2f536f5c868): BigQuery query completed, job : "dataflow_job_4354447435307355184" 2017-05-10T06:04:09.987Z: Basic: (4903a2f536f5cfbf): BigQuery export job "dataflow_job_5261227963771635305" started. You can check its status with the bq tool: "bq show -j --project_id=com-dl-parts-dev dataflow_job_5261227963771635305". 2017-05-10T06:05:10.798Z: Detail: (d0c3fbda1e314661): BigQuery export job progress: "dataflow_job_5261227963771635305" observed total of 1 exported files thus far. 2017-05-10T06:05:10.801Z: Basic: (d0c3fbda1e31498f): BigQuery export job finished: "dataflow_job_5261227963771635305" 2017-05-10T06:06:12.303Z: Basic: (d51372b306784d58): Autoscaling: Resized worker pool from 5 to 1. 2017-05-10T06:06:12.306Z: Detail: (d51372b306784f56): Autoscaling: Reduced the number of workers to 1 based on the rate of progress in the currently running step(s). 2017-05-10T06:07:38.000Z: Basic: (4e3c32839262218d): Autoscaling: Attempting to upscale worker pool to 2. 2017-05-10T06:08:11.074Z: Basic: (4e3c328392622142): Autoscaling: Resized worker pool from 1 to 2. 2017-05-10T06:08:11.077Z: Detail: (4e3c32839262213c): Autoscaling: Raised the number of workers to 2 based on the ability to parallelize the work in the currently running step(s). 2017-05-10T06:13:41.023Z: Basic: (ae82479176c127a3): Autoscaling: Resized worker pool from 2 to 1. 2017-05-10T06:13:41.027Z: Detail: (ae82479176c129d5): Autoscaling: Reduced the number of workers to 1 based on the ability to parallelize the work in the currently running step(s). 2017-05-10T06:20:31.844Z: Basic: (a41dfc71af8c36fd): Executing BigQuery import job "dataflow_job_4354447435307352182". You can check its status with the bq tool: "bq show -j --project_id=com-dl-parts-dev dataflow_job_4354447435307352182".
Update:- Is it right way to prevent fusion?
I am reading a Bigquery table using bigquery IO. This input parameter gives me product number in each record.
After I am doing a ParDo operation. Within process element function I am doing some data forecast operations for each product that I am getting from input.
PCollection<TableRow> quotes3 = quotes2.apply(ParDo.of(new DoFn<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception{
TableRow rowData = c.element();
rowData = c.element();
TableRow tableRowData = ForcastOperation(rowData);
c.output(tableRowDRP);
}
}));
In last step I am dumping the forcast result into bigquery using dataflow pipeline.
quotes3.apply(BigQueryIO.Write
.named("WriteToBigQuery")
.to("com-dl-parts:ds_parts.output_data")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
pipelineTransform.run();
Update :- 17/05/2017 13:38
I am tring to break the fusion by below listed way. It scaled up the GroupByKey apply operation to 308 nodes. but I am not sure it is scaling up the secound pardo having ForcastOperation method to 308 nodes.
PCollection<String> quotes1 = quotes.apply(ParDo.of(new DoFn<TableRow, KV<String, String>>() {
private static final long serialVersionUID = 1L;
private Random random = new Random();
@Override
public void processElement(ProcessContext c) throws Exception{
TableRow rowData = c.element();
rowData = c.element();
c.output(KV.of(rowData.get("APRODUCT").toString(), rowData.get("APRODUCT").toString()));
}
})).apply(GroupByKey.<String, String>create())
.apply(Values.<Iterable<String>>create())
.apply(Flatten.<String>iterables());
PCollection<TableRow> quotes3 = quotes1.apply(ParDo.of(new DoFn<String, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception{
TableRow rowData = c.element();
rowData = c.element();
TableRow tableRowData = ForcastOperation(rowData);
c.output(tableRowDRP);
}
}));
In last step I am dumping the forcast result into bigquery using dataflow pipeline.
quotes3.apply(BigQueryIO.Write
.named("WriteToBigQuery")
.to("com-dl-parts:ds_parts.output_data")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
pipelineTransform.run();