I have an apache-beam based dataflow job to read using vcf source from a single text file (stored in google cloud storage), transform text lines into datastore Entities
and write them into the datastore sink. The workflow works fine but the cons I noticed is that:
- The write speed into datastore is at most around 25-30 entities per second.
- I tried to use
--autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100
but the execution seems to prefer one worker (see graph below: the target workers once increased to 2 but reduced to 1 "based on the ability to parallelize the work in the currently running step").
I did not use ancestor path for the keys; all the entities are the same kind
.
The pipeline code looks like below:
def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
| 'Create my entity' >> beam.ParDo(
ToEntityFn(), user_options.kind)
| 'Write to datastore' >> WriteToDatastore(project))
Because I have millions of rows to write into the datastore, it would take too long to write with a speed of 30 entities/sec.
Question: The input is just one huge gzipped file. Do I need to split it into multiple small files to trigger multiple workers? Is there any other way I can make the importing faster? Do I miss something in the num_workers
setup? Thanks!