I am currently developing a Python (2.7) application that uses the Azure Table Storage service via the Azure Python package .As far as I have read from Azure's REST API, batching operations create atomic transactions. So, if one of the operations fails, the whole batch fails and no operations are executed.
The problem that I have encountered is the following: The method below receives via the "rows" parameter a list of dicts. Some have an ETag set (provided from a previous query).
If the ETag is set, attempt a merge operation. Otherwise, attempt an insert operation. Since there are multiple processes that may modify the same entity, it is required to address the concurrency problem via the "if_match" parameter of the merge_entity function. If the merge/insert operations are individual operations (not included in batch), the system works as expected, raising an Exception if the ETags do not match. Unfortunately, this does not happen if they are wrapped in "begin_batch" / "commit_batch" calls. The entities are merged (wrongly) even though the ETags DO NOT match.
I have provided below both the code and the test case used. Also ran some manual tests several time with the same conclusion.
I am unsure of how to approach this problem. Am I doing something wrong or is it an issue with the Python package?
The code used is the following:
def persist_entities(self, rows):
success = True
self._service.begin_batch() #If commented, works as expected (fails)
for row in rows:
print row
etag = row.pop("ETag")
if not etag:
self._service.insert_entity(self._name,
entity=row)
else:
print "Merging " + etag
self._service.merge_entity(self._name,
row["PartitionKey"],
row["RowKey"],
row, if_match=etag)
try: #Also tried with the try at the begining of the code
self._service.commit_batch() #If commented, works as expected (fails)
except WindowsAzureError:
print "Failed to merge"
self._service.cancel_batch()
success = False
return success
The test case used:
def test_fail_update(self):
service = self._conn.get_service()
partition, new_rows = self._generate_data() #Partition key and list of dicts
success = self._wrapper.persist_entities(new_rows) #Inserts fresh new entity
ok_(success) #Insert succeeds
rows = self._wrapper.get_entities_by_row(partition) #Retreives inserted data for ETag
eq_(len(rows), 1)
for index in rows:
row = rows[index]
data = new_rows[0]
data["Count"] = 155 #Same data, different value
data["ETag"] = "random_etag" #Change ETag to a random string
val = self._wrapper.persist_entities([data]) #Try to merge
ok_(not val) #val = True for merge success, False for merge fail.
#It's always True when operations in batch. False if operations out of batch
rows1 = self._wrapper.get_entities_by_row(partition)
eq_(len(rows1), 1)
eq_(rows1[index].Count, 123)
break
def _generate_data(self):
date = datetime.now().isoformat()
partition = "{0}_{1}_{2}".format("1",
Stats.RESOLUTION_DAY, date)
data = {
"PartitionKey": partition,
"RowKey": "viewitem",
"Count": 123,
"ETag": None
}
return partition, [data]