3
votes

I am creating a pipeline on Dataflow (Apache beam) to read and write data on Google BigQuery, however I have problems to create a DAG as I would do with Airflow.

This is an example from my code:

# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps 
                                    | 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )

I'd expect to execute these tasks SEQUENTIALLY, instead Dataflow executes them in PARALLEL

enter image description here

How do I have them execute sequentially?

2

2 Answers

4
votes

I'm assuming you are reading from big-query like this:

count = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input_table))

I dug into the apache_beam source code a bit and it looks like their Source transforms disregard the input pcollection which is why they are getting set up in parallel.

See the last line of def expand(self, pbegin)::

class Read(ptransform.PTransform):
  """A transform that reads a PCollection."""

  def __init__(self, source):
    """Initializes a Read transform.

    Args:
      source: Data source to read from.
    """
    super(Read, self).__init__()
    self.source = source

  def expand(self, pbegin):
    from apache_beam.options.pipeline_options import DebugOptions
    from apache_beam.transforms import util

    assert isinstance(pbegin, pvalue.PBegin)
    self.pipeline = pbegin.pipeline

    debug_options = self.pipeline._options.view_as(DebugOptions)
    if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
      source = self.source

      def split_source(unused_impulse):
        total_size = source.estimate_size()
        if total_size:
          # 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
          chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
        else:
          chunk_size = 64 << 20  # 64mb
        return source.split(chunk_size)

      return (
          pbegin
          | core.Impulse()
          | 'Split' >> core.FlatMap(split_source)
          | util.Reshuffle()
          | 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
              split.source.get_range_tracker(
                  split.start_position, split.stop_position))))
    else:
      # Treat Read itself as a primitive.
      return pvalue.PCollection(self.pipeline)

# ... other methods

It looks like if you set this experimental beam_fn_api pipeline debug_option then pbegin would actually get used, but i'm not sure what the other effects of that option are.

Why do you need them to happen sequentially? You are seem to be writing to one table and then reading from another?

If you really need this to happen sequentially, maybe subclassing it Read like this would do the trick

class SequentialRead(Read):
  def expand(self, pbegin):
      return pbegin
2
votes

As you wish to both have the intermediate steps output to BigQuery and flow the data between two transforms, I think a Branch would achieve your desired outcome.

PCollection_1 = (Read From BQ).apply(Transform_1)

PCollection_1.apply(Write to BQ)

PCollection_1.apply(Transform_2).apply(Write to BQ)

This will allow you to apply Transform_2 on the elements after they have gone through Transform_1 & write that intermediate step to BQ. By applying multiple ParDo's against the same PCollection you generate a different branch in the DAG.