0
votes

I have a certain use-case, where I'm trying to write around TB's of data in Spanner. We are extracting that data from DynamoDb and exporting that data in bzip2 format in Google Cloud Storage. So Basically we have primary id's in Spanner and we have to ignore already exist rows in Spanner. So I have written the below code to achieve the same.

    Mutation.WriteBuilder mutation = Mutation.newInsertBuilder(spannerTable.get());

I have written insert builder as I don't want to update the existing row in Spanner. Using the below code to write row to Spanner by setting FailureMode.

results2.apply("Write Mutations to Spanner",SpannerIO.write()
                .withInstanceId(spannerInstanceId)
                .withDatabaseId(spannerDatabaseId)
                //.withBatchSizeBytes(2000000)
                //.withMaxNumMutations(maxNumMutations)
                .withFailureMode(FailureMode.REPORT_FAILURES)
                ); 

But the problem with the code is, Dataflow code is retrying the entire batch because of '"ALREADY EXISTS"' Mutation. I can't use FailureMode.FAST_FAIL, because it stops the entire pipeline. I also tried to set the minimal MaxNumMutation to decrease the batch_size(basically to decrease the probability of 'already_exist' record in Mutation batch), but the overall performance hampers. So is there any way I can stop the retry mechanism for 'already exist' mutation record?

1

1 Answers

1
votes

You should use an InsertOrUpdate mutation instead of InsertMutation. That will do exactly what you expect in this case; insert the row if it is not already there, and otherwise update it. In your case, the update will not do anything useful as no values should change, but it will stop your pipeline from breaking.

Example:

    Mutation mutation = Mutation.newInsertOrUpdateBuilder("FOO")
        .set("ID")
        .to(1)
        .set("BAR")
        .to("BAZ")
        .build();

You can also see the previous in the Spanner Official Documentation .