I'm assuming you are reading from big-query like this:
count = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input_table))
I dug into the apache_beam source code a bit and it looks like their Source transforms disregard the input pcollection which is why they are getting set up in parallel.
See the last line of def expand(self, pbegin):
:
class Read(ptransform.PTransform):
"""A transform that reads a PCollection."""
def __init__(self, source):
"""Initializes a Read transform.
Args:
source: Data source to read from.
"""
super(Read, self).__init__()
self.source = source
def expand(self, pbegin):
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.transforms import util
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
debug_options = self.pipeline._options.view_as(DebugOptions)
if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
source = self.source
def split_source(unused_impulse):
total_size = source.estimate_size()
if total_size:
chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
else:
chunk_size = 64 << 20
return source.split(chunk_size)
return (
pbegin
| core.Impulse()
| 'Split' >> core.FlatMap(split_source)
| util.Reshuffle()
| 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
split.source.get_range_tracker(
split.start_position, split.stop_position))))
else:
return pvalue.PCollection(self.pipeline)
It looks like if you set this experimental beam_fn_api
pipeline debug_option then pbegin
would actually get used, but i'm not sure what the other effects of that option are.
Why do you need them to happen sequentially? You are seem to be writing to one table and then reading from another?
If you really need this to happen sequentially, maybe subclassing it Read
like this would do the trick
class SequentialRead(Read):
def expand(self, pbegin):
return pbegin