0
votes

Processing is not distributed by DataFlow, but is processed by one node.

I created the following program and validate that it works correctly with a small data.

Read data from BigQuery → Dofn processing → Combine processing → Flatten processing → Combine processing → Flatten processing → Write data to Bigquery.

Next, I will test with large data that it works in a multi-node parallel environment.

Specify numWorkers and --autoscalingAlgorithm = NONE as optional parameters at Dataflow startup.

Since it takes a very long time to execute, I will investigate.

  • I checked the execution situation with job view of Dataflow. It takes time to process combination.
  • Inspect computer metrics on GCE VM instances. One computer is consuming and running resources, while the other computers are idle.
  • the log with StackDriver. The addInput process in the Combine process is being performed on a single active computer, which was checked using metrics earlier.
  • When I look at other logs in StackDriver, I occasionally see the message Discarding invalid work item null on an idle computer.

By the way, if I do not specify the startup options numWorkers and --autoscalingAlgorithm = NONE, that is auto scaling, you will only start one node.

Dataflow thought that writing a program according to beam's idiom would be distributed to many nodes in a "good" way, but it works differently than expected.

How does it work well?

1
Taking into account that the job takes time in combination process, I think this could be related to hot-keys, since the value with same key should be proceeded in the same VM.enle lin
I understand that hotkeys are "a big bias in the distribution of keys during grouping". I will add some more explanation.The Combine process I created is implemented by running the CombineFn implementation in Combine.globally. Process relatively large data using a custom Accumlator.Naoki Hyu
This program works to merge tens of thousands of records under certain conditions and reduce the number of rows. It is an ideal behavior to expect shuffled data to be evenly distributed and reduced to each node, to merge accumulators in stages, and to eventually combine the final results into one node. But that doesn't work that way. . . In such a use case, I think that data processing bias does not occur and I think that it is irrelevant to the hot key.Naoki Hyu
i wonder if it's the second Combine that's slow. After the first Combine.globally the PCollection has 1 element on one node. Could you try adding a Reshuffle.viaRandomKey() between the first Flatten and second Combine? That might spread the data across workers.Udi Meiri
Well. In the first combine, "the phenomenon that is not dispersed" happens. Should I shuffle before this?Naoki Hyu

1 Answers

0
votes

I did it! Great ! Thank you!!

p.apply("ReadFromBQ",BigQueryIO.readTableRows().fromQuery(query).usingStandardSql().withTemplateCompatibility())
.apply("shuffle as expressly", Reshuffle.viaRandomKey())
.apply("convert table row", ........