3
votes

I have 2 dataflow streaming pipelines (pubsub to bigquery) with the following code :

class transform_class(beam.DoFn):

    def process(self, element, publish_time=beam.DoFn.TimestampParam, *args, **kwargs):
        logging.info(element)
        yield element

class identify_and_transform_tables(beam.DoFn):
    #Adding Publish Timestamp
    #Since I'm reading from a topic that consist data from multiple tables, 
    #function here is to identify the tables and split them apart


def run(pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        lines = (pipeline 
                | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic='topic name',with_attributes = True)
                | 'Transforming Messages' >> beam.ParDo(transform_class())
                | 'Identify Tables' >> beam.ParDo(identify_and_transform_tables()).with_outputs('table_name'))

        table_name = lines.table_name
        table_name = (table_name 
                        | 'Write table_name to BQ' >> beam.io.WriteToBigQuery(
                        table='table_name',
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                        )

    result = pipeline.run()

Both of these pipelines read from the same pubsub topic. While reconciling, I found out that some data were missing and the missing data were different for both pipelines. For example,

Row 56-62 was missing from pipeline 1 but present in pipeline 2
Row 90-95 was missing from pipeline 2 but present in pipeline 1

Hence, it means that the data is present in the pubsub topic.
As you can see in the code, the first function is to log the pubsub message directly into stackdriver. I double checked for the missing data in the stackdriver logs additional to bigquery.

Another thing I've found out is that these missing data happen in chunks of time. Example, Row 56-62 has timestamps '2019-12-03 05:52:18.754150 UTC' and close to that (to the millisecond)

Hence, my only conclusion is that dataflow readfrompubsub has occasions where data just go missing ?
Any assistance is deeply appreaciated.

1

1 Answers

2
votes

I'm not sure what happened in this case, but this is an important rule to follow to prevent data losses:

  • Don't read from a topic, as in beam.io.ReadFromPubSub(topic='topic name').
  • Do read from a subscription, as in beam.io.ReadFromPubSub(subscription='subscription name').

This because in case of restart, a new subscription will be created in the first case - and this subscription might only contain data received after it was created. If you create the subscription beforehand, data will be retained in it until it's read (or it expires).