2
votes

I have a pull queue being serviced by a backend and when the queue is empty I need to trigger another script.

At the moment I am using a very crude detection in the method that leases the tasks from the queue, so that if the task list returned is empty we presume that there are no more to lease and trigger the next step. However, while this works most of the time, occasionally a lease request seems to return an empty list even though there are tasks available.

Anyway, the better way to do it I think is to use the fetch_statistics method of the Queue. That way the script can monitor whats going on in the pull queue and know that there are no more items left in the queue. Now this is obviously available via the REST api for queues, but it seems rather backward to use this when I am using these internally.

So I am making the Queue.fetch_statistics() call, but it throws an error. I've tried putting the stated error into Google, but it returns nothing. Same here on stackoverflow.

It always throws:

AttributeError: type object 'QueueStatistics' has no attribute '_QueueStatistics__TranslateError'

My code is:

    q = taskqueue.Queue('reporting-pull')
    try:
        logging.debug(q.fetch_statistics())
    except Exception, e:
        logging.exception(e)

Can anyone shed any light on this? I am I doing something really stupid here?

3
AFAIK Queue doesn't have a fetch_statistics method - Shay Erlichmen
It does, well its certainly in the code sdk, but its not documented. It is the method used that is JSONified for the REST API service. In taskqueue.py line 1810 def fetch_statistics(self): """Get the current details about this queue. Returns: A QueueStatistics instance containing information about this queue. """ return QueueStatistics.fetch(self) Also the exception thrown is relating to the object returned from the method and not the method itself not existing. It might well be slightly different at runtime. - Justin Grayston
working with undocumented api, where are any moment the provider can cut you off and break you is no the wisest thing todo. - Shay Erlichmen
Its only undocumented on code.google.com/appengine/docs/python/taskqueue and quite a few things are not included or strictly accurate. The code itself is being used by the REST API so unlikely to be cut off, the feature I recall being mentioned on the PYTHON Group a while back too. I only need it to run for the rest of March anyway, so its not like I am relying on it as part of a core longterm application function. - Justin Grayston

3 Answers

2
votes

The Task Queue Statistics API is now documented and publicly available. The error no longer occurs.

3
votes

Just incase it is useful to anyone else, here is an example function to get you started getting queue info from your app. Its only an example, and could do with better error handling, but it should get you up and running. Previously we have used the Taskqueue client but I thought that was a bit overkill as we can lease and delete in the code anyway, so I used app identity, and it worked a treat.

from google.appengine.api import taskqueue
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
try:
    import json
except ImportError:
    import simplejson as json
import logging

def get_queue_info(queue_name, stats=False):
    '''
        Uses the Queue REST API to fetch queue info
        Args:
            queue_name: string - the name of the queue
            stats: boolean - get the stats info too
        RETURNS:
            DICT: from the JSON response or False on fail
    '''
    scope = 'https://www.googleapis.com/auth/taskqueue'
    authorization_token, _ = app_identity.get_access_token(scope)
    app_id = app_identity.get_application_id()
    #note the s~ denoting HRD its not mentioned in the docs as far as 
    #I can see, but it wont work without it
    uri = 'https://www.googleapis.com/taskqueue/v1beta1/projects/s~%s/taskqueues/%s?getStats=%s' % (app_id, queue_name, stats)
    #make the call to the API
    response = urlfetch.fetch(uri, method="GET", headers = {"Authorization": "OAuth " + authorization_token})
    if response.status_code == 200:
        result = json.loads(response.content)
    else:
        logging.error('could not get queue')
        logging.error(response.status_code)
        logging.error(response.content)
        return False


    return result

Don't forget to update your queue.yaml with the acl for your app identity

-name: queue_name
 mode: pull
 acl:
 - user_email: [email protected]

I hope someone finds this useful.

In the meantime I have posted a Feature request so we can do this with the Queue object, please go and star it if you want it too. http://goo.gl/W8Pk1

1
votes

The immediate reason for the specific error you're getting seems to be a bug in the code; Queue.fetch_statistics() calls QueueStatistics.fetch() calls QueueStatistics._FetchMultipleQueues() which apparently encounters an apiproxy_errors.ApplicationError and then tries to call cls.__TranslateError() but there is no such method on the QueueStatistics class.

I don't know the deeper reason for the ApplicationError, but it may mean that the feature is not yet supported by the production runtime.