For some reason, every time that I try to create a schema-ware PCollection in my pipeline (Apache Beam python), using either beam.Select() or beam.Row() and try to run it on Dataflow, I get a fatal error shown below that causes that the worker to die on the VM. Strangely, I DON'T get this error when I run locally using Directrunner.
I wanted to make sure that it's not some stupid mistake I made in my custom code. I tried with Google's examples, such as the snippet below simplified from: https://github.com/apache/beam/blob/034ccdf93a0c5dfe6629501b456105ac47047e44/sdks/python/apache_beam/examples/sql_taxi.py
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
PROJECT_ID = <my_project_id>
def run(pipeline_args):
pipeline_options = PipelineOptions(
pipeline_args, project=PROJECT_ID, save_main_session=True, streaming=True)
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| beam.io.ReadFromPubSub(
topic='projects/pubsub-public-data/topics/taxirides-realtime',
timestamp_attribute="ts").with_output_types(bytes)
| "Parse JSON payload" >> beam.Map(json.loads)
# Use beam.Row to create a schema-aware PCollection
# ** This gives me trouble everytime when run through Dataflow **
| "Create beam Row" >> beam.Map(
lambda x: beam.Row(
ride_status=str(x['ride_status']),
passenger_count=int(x['passenger_count']))))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
import argparse
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args()
run(pipeline_args)
Exactly same issue. Perfectly fine when run locally in Directrunner; crashes when run in Dataflow. I am using apache-beam 2.31.0 python SDK
I have been wrecking my brain for days over this. Any help or guidance would be tremendously appreciated!!
