0
votes

I am using the taskqueue API to send multiple emails is small groups with mailgun. My code looks more or less like this:

class CpMsg(ndb.Model):
    group = ndb.KeyProperty()
    sent = ndb.BooleanProperty()
    #Other properties


def send_mail(messages):
    """Sends a request to mailgun's API"""
    # Some code
    pass


class MailTask(TaskHandler):
    def post(self):
        p_key = utils.key_from_string(self.request.get('p'))
        msgs = CpMsg.query(
            CpMsg.group==p_key,
            CpMsg.sent==False).fetch(BATCH_SIZE)

        if msgs:
            send_mail(msgs)

            for msg in msgs:
                msg.sent = True

            ndb.put_multi(msgs)

            #Call the task again in COOLDOWN seconds

The code above has been working fine, but according to the docs, the taskqueue API guarantees that a task is delivered at least once, so tasks should be idempotent. Now, most of the time this would be the case with the above code, since it only gets messages that have the 'sent' property equal to False. The problem is that non ancestor ndb queries are only eventually consistent, which means that if the task is executed twice in quick succession the query may return stale results and include the messages that were just sent.

I thought of including an ancestor for the messages, but since the sent emails will be in the thousands I'm worried that may mean having large entity groups, which have a limited write throughput.

Should I use an ancestor to make the queries? Or maybe there is a way to configure mailgun to avoid sending the same email twice? Should I just accept the risk that in some rare cases a few emails may be sent more than once?

1

1 Answers

1
votes

One possible approach to avoid the eventual consistency hurdle is to make the query a keys_only one, then iterate through the message keys to get the actual messages by key lookup (strong consistency), check if msg.sent is True and skip sending those messages in such case. Something along these lines:

    msg_keys = CpMsg.query(
        CpMsg.group==p_key,
        CpMsg.sent==False).fetch(BATCH_SIZE, keys_only=True)
    if not msg_keys:
        return

    msgs = ndb.get_multi(msg_keys)
    msgs_to_send = []

    for msg in msgs:
         if not msg.sent:
             msgs_to_send.append(msg)
    if msgs_to_send:
        send_mail(msgs_to_send)

        for msg in msgs_to_send:
            msg.sent = True

        ndb.put_multi(msgs_to_send)

You'd also have to make your post call transactional (with the @ndb.transactional() decorator).

This should address the duplicates caused by the query eventual consistency. However there still is room for duplicates caused by transaction retries due to datastore contention (or any other reason) - as the send_mail() call isn't idempotent. Sending one message at a time (maybe using the task queue) could reduce the chance of that happening. See also GAE/P: Transaction safety with API calls