I'm just starting with Dataflow / Apache Beam. I wrote a Dataflow pipeline in Python to import a big catalog of products (> 50K products, stored inside a JSON file) into Datastore. The pipeline runs fine on my local machine (DirectRunner) but fails on the DataflowRunner with the following error message:
RPCError: datastore call commit [while running 'write to datastore/Write Mutation to Datastore'] failed: Error code: INVALID_ARGUMENT. Message: datastore transaction or write too big.
My guess would be that Datastore can't handle the write rate of the pipeline, but I'm not sure on how I can throttle down the writing in the Dataflow pipeline.
I'm using the WriteToDatastore transform to write to DataStore:
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
My pipeline looks like this:
with beam.Pipeline(options=pipeline_options) as p:
(p # pylint: disable=expression-not-assigned
| 'read from json' >> ReadFromText(known_args.input, coder=JsonCoder())
| 'create entity' >> beam.Map(
EntityWrapper(known_args.namespace, known_args.kind,
known_args.ancestor).make_entity)
| 'write to datastore' >> WriteToDatastore(known_args.dataset))
Thanks in advance for your help.