1
votes

I want to upsert entities to datastore using apache beam, but before doing WriteToDatastore, I created a custom DoFN that - takes entity from import step, - checks if entity exists in Datastore - if yes, extracts value of a property and concatenates the value with new entity. - outputs entity

Example: My data consists of columns: parent_id, nationality & child_name. Input data are new children born. Before upserting to datastore I want to get a parent's existing chidlren and append new value.

with beam.Pipeline(options=options) as p:

    (p | 'Reading input file' >> beam.io.ReadFromText(input_file_path)
     | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict())
     | 'Create entities' >> beam.ParDo(CreateEntities())
     | 'Update entities' >> beam.ParDo(UpdateEntities())
     | 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
     )

The Pardo that takes the most time is Update Entities:

class UpdateEntities(beam.DoFn):
"""Updates Datastore entity"""
def process(self, element):
    query = query_pb2.Query()
    parent_key = entity_pb2.Key()
    parent = datastore_helper.get_value(element.properties['parent_id'])
    datastore_helper.add_key_path(parent_key, kind, parent)
    parent_key.partition_id.namespace_id = datastore_helper.get_value(element.properties['nationality'])
    query.kind.add().name = kind
    datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.EQUAL, parent_key)


    req = helper.make_request(project=PROJECT, namespace=parent_key.partition_id.namespace_id,query=query)
    resp = helper.get_datastore(PROJECT).run_query(req)

    if len(resp.batch.entity_results) > 0:
        existing_entity = resp.batch.entity_results[0].entity
        existing_child_name_v = datastore_helper.get_value(existing_entity.properties['child_name'])
        new_child_names = existing_child_name_v + ';' + datastore_helper.get_value(element.properties['child_name'])
        datastore_helper.set_value(element.properties['child_name'],new_child_names)
        return [element]
    else:
        return [element]
1

1 Answers

1
votes

It's not surprising that the UpdateEntities is the slowest part of your beam flow. You do an RPC in each and every call to UpdateEntities (you should use get/lookup instead of query on a key because queries on keys are eventually consistent). As long as you do an RPC in UpdateEntities it will be the slowest part of your job.