3
votes

We started using Dataflow to read from PubSub and Stream to BigQuery. Dataflow should work 24/7, because pubsub is constantly updated with analytics data of multiple websites around the world.

Code looks like this:

from __future__ import absolute_import

import argparse
import json
import logging

import apache_beam as beam
from apache_beam.io import ReadFromPubSub, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

logger = logging.getLogger()

TABLE_IDS = {
    'table_1': 0,
    'table_2': 1,
    'table_3': 2,
    'table_4': 3,
    'table_5': 4,
    'table_6': 5,
    'table_7': 6,
    'table_8': 7,
    'table_9': 8,
    'table_10': 9,
    'table_11': 10,
    'table_12': 11,
    'table_13': 12
 }


def separate_by_table(element, num):
    return TABLE_IDS[element.get('meta_type')]


class ExtractingDoFn(beam.DoFn):
    def process(self, element):
        yield json.loads(element)


def run(argv=None):
    """Main entry point; defines and runs the wordcount pipeline."""
    logger.info('STARTED!')
    parser = argparse.ArgumentParser()
    parser.add_argument('--topic',
                        dest='topic',
                        default='projects/PROJECT_NAME/topics/TOPICNAME',
                        help='Gloud topic in form "projects/<project>/topics/<topic>"')
    parser.add_argument('--table',
                        dest='table',
                        default='PROJECTNAME:DATASET_NAME.event_%s',
                        help='Gloud topic in form "PROJECT:DATASET.TABLE"')
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    lines = p | ReadFromPubSub(known_args.topic)
    datas = lines | beam.ParDo(ExtractingDoFn())
    by_table = datas | beam.Partition(separate_by_table, 13)

    # Create a stream for each table
    for table, id in TABLE_IDS.items():
        by_table[id] | 'write to %s' % table >> WriteToBigQuery(known_args.table % table)

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
    logger.setLevel(logging.INFO)
    run()

It works fine but after some time (2-3 days) it stops streaming for some reason. When I check job status, it contains no errors in the logs section (you know, ones marked with red "!" in dataflow's job details). If I cancel the job and run it again - it starts working again, as usual. If I check Stackdriver for additional logs, here's all Errors that happened: Errors list Here's some warnings that occur periodically while job executes: Warnings list Details of one of them:

 {
 insertId: "397122810208336921:865794:0:479132535"  

jsonPayload: {
  exception: "java.lang.IllegalStateException: Cannot be called on unstarted operation.
    at com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.getElementsSent(RemoteGrpcPortWriteOperation.java:111)
    at com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:293)
    at com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:280)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
"   
  job: "2018-11-30_10_35_19-13557985235326353911"   
  logger: "com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor"   
  message: "Progress updating failed 4 times. Following exception safely handled."   
  stage: "S0"   
  thread: "62"   
  work: "c-8756541438010208464"   
  worker: "beamapp-vitar-1130183512--11301035-mdna-harness-lft7"   
 }

labels: {
  compute.googleapis.com/resource_id: "397122810208336921"   
  compute.googleapis.com/resource_name: "beamapp-vitar-1130183512--11301035-mdna-harness-lft7"   
  compute.googleapis.com/resource_type: "instance"   
  dataflow.googleapis.com/job_id: "2018-11-30_10_35_19-13557985235326353911"   
  dataflow.googleapis.com/job_name: "beamapp-vitar-1130183512-742054"   
  dataflow.googleapis.com/region: "europe-west1"   
 }
 logName: "projects/PROJECTNAME/logs/dataflow.googleapis.com%2Fharness"  
 receiveTimestamp: "2018-12-03T20:33:00.444208704Z"  

resource: {

labels: {
   job_id: "2018-11-30_10_35_19-13557985235326353911"    
   job_name: "beamapp-vitar-1130183512-742054"    
   project_id: PROJECTNAME
   region: "europe-west1"    
   step_id: ""    
  }
  type: "dataflow_step"   
 }
 severity: "WARNING"  
 timestamp: "2018-12-03T20:32:59.442Z"  
}

Here's the moment when it seems to start having problems: Problem arised Additional info messages that may help: Info messages

According to these messages, we don't run out of memory/processing power etc. The job is run with these parameters:

python -m start --streaming True --runner DataflowRunner --project PROJECTNAME --temp_location gs://BUCKETNAME/tmp/ --region europe-west1 --disk_size_gb 30 --machine_type n1-standard-1 --use_public_ips false --num_workers 1 --max_num_workers 1 --autoscaling_algorithm NONE

What could be the problem here?

3

3 Answers

2
votes

This isn't really an answer, more helping identify the cause: so far, all streaming Dataflow jobs I've launched using python SDK have stopped that way after some days, whether they use BigQuery as sink or not. So the cause rather seems to be the general fact that streaming jobs with the python SDK are still in beta.

My personal solution: use the Dataflow templates to stream from Pub/Sub to BigQuery (thus avoiding the python SDK), then schedule queries in BigQuery to periodically treat the data. Unfortunately that might not be appropriate for your use cases.

1
votes

in my company we are experiencing the same and identical problem, as described by the OP, with a similar use case.

Unfortunately the problem is real, concrete and apparently with a random occurrence.

As a workaround, we are considering rewriting our pipeline using the java SDK.

0
votes

I had a similar issue to this and found that the warning logs contained python Stack trace hidden in the java logs advising of errors.

These errors were continually re-tried by workers causing them to crash and completely freeze the pipeline. I initially thought the No. of workers was too low, so scaled up the number of workers, but the pipeline just took longer to freeze.

I ran the pipeline locally and exported the pubsub messages as text and identified they contained dirty data(messages that did not match the BQ table schema) and as I had no exception handling, that seemed to be the cause of the pipeline to freeze.

Adding a function only accept a record where the first key matches the expected column of your BQ Schema fixed my issue and the Dataflow Job has been running with no issues ongoing.

def bad_records(row):
    if 'key1' in row:
        yield row
    else:
        print('bad row',row)


|'exclude bad records' >> beam.ParDo(bad_records)