0
votes

For some reason, every time that I try to create a schema-ware PCollection in my pipeline (Apache Beam python), using either beam.Select() or beam.Row() and try to run it on Dataflow, I get a fatal error shown below that causes that the worker to die on the VM. Strangely, I DON'T get this error when I run locally using Directrunner.

enter image description here

I wanted to make sure that it's not some stupid mistake I made in my custom code. I tried with Google's examples, such as the snippet below simplified from: https://github.com/apache/beam/blob/034ccdf93a0c5dfe6629501b456105ac47047e44/sdks/python/apache_beam/examples/sql_taxi.py

import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

PROJECT_ID = <my_project_id>

def run(pipeline_args):
  pipeline_options = PipelineOptions(
      pipeline_args, project=PROJECT_ID, save_main_session=True, streaming=True)

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | beam.io.ReadFromPubSub(
            topic='projects/pubsub-public-data/topics/taxirides-realtime',
            timestamp_attribute="ts").with_output_types(bytes)
        | "Parse JSON payload" >> beam.Map(json.loads)
        # Use beam.Row to create a schema-aware PCollection
        # ** This gives me trouble everytime when run through Dataflow **
        | "Create beam Row" >> beam.Map(
            lambda x: beam.Row(
                ride_status=str(x['ride_status']),
                passenger_count=int(x['passenger_count']))))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  import argparse

  parser = argparse.ArgumentParser()
  _, pipeline_args = parser.parse_known_args()

  run(pipeline_args)

Exactly same issue. Perfectly fine when run locally in Directrunner; crashes when run in Dataflow. I am using apache-beam 2.31.0 python SDK

I have been wrecking my brain for days over this. Any help or guidance would be tremendously appreciated!!

1
This seems to be the issue: issues.apache.org/jira/browse/BEAM-10110. Can you try explicitly defining the schema as NamedTuple instead of using ad-hoc schema declaration with Row: beam.apache.org/documentation/programming-guide/#schemas - 大ドア東
@大ドア東 Thanks! I will give that a shot - yl_elm_city

1 Answers

0
votes

Thanks to @大ドア東, I was able to get it to work on Google Dataflow using NamedTuple.

I am still shocked however that beam.Row() or beam.Select() simply doesn't work on Dataflow for Apache Beam python SDK and I couldn't find similar complaint on stackoverflow.