TLDR: Looking for a way to update Datastore entities without overwriting existing data via Dataflow
I'm using dataflow 2.0.0 (beam) to update entities in Google Datastore. My dataflow loads entities from datastore, updates them, and then saves them back into datastore (overwriting existing entities).
However, during the update process I also discover additional entities that may or may not already exist. In order to prevent overwriting existing entities, I previously would load all the entities from Datastore and reduce them (group by key), removing new duplicates.
As the number of entities grows, I want to avoid having to load all entities into Dataflow (instead taking them in batches based on oldest timestamps), but I'm coming across the problem that old entities are getting overwritten when they are not in the current batch.
I'm writing the entities to Dataflow using (in two spots, one for existing entities, and one for new entities):
collection.apply(DatastoreIO.v1().write().withProjectId("..."))
It would be really nice if there was something like a DatastoreIO.v1().writeNew() method but sadly it doesn't exist. Thank you for any help.