1
votes

I designed a beam / dataflow pipeline using the beam python library. The pipeline roughly does the following:

  1. ParDo: Collect JSON data from an API
  2. ParDo: Transform JSON data
  3. I/O: Write transformed data to BigQuery Table

Generally, the code does what it is supposed to do. However, when collecting a big dataset from the API (around 500.000 JSON files), the bigquery insert job stops right (=within one second) after it has been started without specific error message when using the DataflowRunner (it's working with DirectRunner executed on my computer). When using a smaller dataset,everything works just fine.

Dataflow log is as follows:

2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...
Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105". 
2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /Wr...
Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
beamapp-X-04212005-04211305-sf4k-harness-lqjg,
beamapp-X-04212005-04211305-sf4k-harness-lgg2,
beamapp-X-04212005-04211305-sf4k-harness-qn55,
beamapp-X-04212005-04211305-sf4k-harness-hcsn

Using the bq cli tool as suggested to get more information about the BQ load job does not work. The job cannot be found (and I doubt that it has been created at all due to instant failure).

I suppose I run into some kind of quota / bq restriction or even an out of memory issue (see: https://beam.apache.org/documentation/io/built-in/google-bigquery/)

Limitations BigQueryIO currently has the following limitations.

You can’t sequence the completion of a BigQuery write with other steps of >your pipeline.

If you are using the Beam SDK for Python, you might have import size quota >issues if you write a very large dataset. As a workaround, you can partition >the dataset (for example, using Beam’s Partition transform) and write to >multiple BigQuery tables. The Beam SDK for Java does not have this >limitation as it partitions your dataset for you.

I'd appreciate any hint on how to narrow down the root cause for this issue.

I'd also like to try out a Partition Fn but did not find any python source code examples how to write a partitioned pcollection to BigQuery Tables.

2

2 Answers

3
votes

One thing that might help the debugging is looking at the Stackdriver logs.

If you pull up the Dataflow job in the Google console and click on LOGS in the top right corner of the graph panel, that should open the logs panel at the bottom. The top right of the LOGS panel has a link to Stackdriver. This will give you a lot of logging information about your workers/shuffles/etc. for this particular job.

There's a lot in it, and it can be hard to filter out what's relevant, but hopefully you're able to find something more helpful than A work item was attempted 4 times without success. For instance, each worker occasionally logs how much memory it is using, which can be compared to the amount of memory each worker has (based on the machine type) to see if they are indeed running out of memory, or if your error is happening elsewhere.

Good luck!

1
votes

As far as I know, there is no available option to diagnose OOM in Cloud Dataflow and Apache Beam's Python SDK (it is possible with Java SDK). I recommend you to open a feature request in the Cloud Dataflow issue tracker to get more details for this kind of issues.

Additionally to checking the Dataflow job log files, I recommend you to monitor your pipeline by using Stackdriver Monitoring tool that provides the resources usage per job (as the Total memory usage time).

Regarding to the Partition function usage in the Python SDK, the following code (based on the sample provide in Apache Beam's documentation) split the data into 3 BigQuery load jobs:

def partition_fn(input_data, num_partitions):
      return int(get_percentile(lines) * num_partitions / 100)

    partition = input_data | beam.Partition(partition_fn, 3)

    for x in range(3):
      partition[x] | 'WritePartition %s' % x >> beam.io.WriteToBigQuery(
        table_spec,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)