3
votes

TLDR: Looking for a way to update Datastore entities without overwriting existing data via Dataflow

I'm using dataflow 2.0.0 (beam) to update entities in Google Datastore. My dataflow loads entities from datastore, updates them, and then saves them back into datastore (overwriting existing entities).

However, during the update process I also discover additional entities that may or may not already exist. In order to prevent overwriting existing entities, I previously would load all the entities from Datastore and reduce them (group by key), removing new duplicates.

As the number of entities grows, I want to avoid having to load all entities into Dataflow (instead taking them in batches based on oldest timestamps), but I'm coming across the problem that old entities are getting overwritten when they are not in the current batch.


I'm writing the entities to Dataflow using (in two spots, one for existing entities, and one for new entities):

collection.apply(DatastoreIO.v1().write().withProjectId("..."))

It would be really nice if there was something like a DatastoreIO.v1().writeNew() method but sadly it doesn't exist. Thank you for any help.

1
It is not clear to me what you are trying to do, as at one you say you want to overwrite entities and also prevent overwriting entities. Can you clarify? Also, how are you loading your entities in batches? - Vikas Kedigehalli
Yes, we totally should have that exposed via Dataflow. The service supports Insert, Upsert, and Update - but we currently only expose Upsert in Dataflow. - Dan McGrath
The semantics of non idempotent writes require extra thought in how retries are handled given that Dataflow continues to retry until every record is successfully written. We could end up in a scenario where some writes succeeded but the Dataflow job failed, and any future retries of the job never succeed. - Vikas Kedigehalli
@VikasKedigehalli Batches basically by selecting entities older than X time OR selecting the Y oldest entities. And yes, I basically want to do entity updates instead of overwrites/upserts - Mingwei Samuel
@MingweiSamuel Can you clarify what you mean by "update Datastore entities without overwriting existing data"? According to Datastore documentation cloud.google.com/datastore/docs/concepts/…, updating an entity is basically an overwrite. - Vikas Kedigehalli

1 Answers

1
votes

If you want to write a new entity that does not exist on Datastore, you just create one with a new key and write it.

List<String> keyNames = Arrays.asList("L1", "L2"); // Somewhat you have new keys to store
PTransform<PCollection<Entity>, ?> write =
        DatastoreIO.v1().write().withProjectId(project_id); // This is a typical write operation

p.
    apply("GetInMemory", Create.of(keyNames)).setCoder(StringUtf8Coder.of()). // L1 and L2 are loaded
    apply("Proc1", ParDo.of(new DoFn<String, Entity>(){
        @ProcessElement
        public void processElement(ProcessContext c) {
            Key.Builder key = makeKey("k2", c.element());  // Generate an entity key
            final Entity entity = Entity.newBuilder().
                    setKey(key). // Set the key
                    putProperties("p1", makeValue(new String("test constant value")
                        ).setExcludeFromIndexes(true).build()).
                    build();
            c.output(entity);
        }
    })).
    apply(write); // Write them
p.run();

Entire code can be referred in my code repository at https://github.com/yiu31802/gcp-project/commit/cc224b34