4
votes

I have some parsing application and it basically do the following

  • Adds start() method to the IOLoop as a callback to call on the next iteration
  • start() calls another function, let's call it get_input()
  • get_input() is a coroutine which fetches some data over the net, then schedules another coroutine, process_input(), by adding it as start() was added in the first step.
  • get_input() also checks some condition which depends on fetched data and may schedule itself with adjusted arguments

Now, after this condition renders False I know that there won't be any new input items to process.
But how do I know that there are no Futures of get_input() and process_input() which are still unresolved?

I suppose this could be solved by implementing a kind of counter, which is incremented every time process_input() is called and decrementing after it is resolved
But if there is a chain of different coroutines? How can I track their state so that if I stop IOLoop, no tasks will die before they get resolved?

Maybe, there should be some kind of hierarchical counting...

edit:

2 @dano OK, I see now... I was inattentive. You really do not block as its own call is inside this list
But!

  1. Such organization reqires that only yield construction must be used, no add_callback as otherwise we lose "waiting" concept
  2. The recursion level grows.. Mmm, dunno if it's too bad

What I came up with today is "metafuture"
I create a bare Future() object.
I decorate every @coroutine-enabled method with my decorator, which increments counter field in "metafuture" and adds a custom done callback to their futures, that should decrement it.

When it reaches zero, "metafuture" resolves by calling set_result(None) THere is also a IOLoop callback that yields exactly that metafuture:

    @coroutine
    def wait_for_complete(self):
      yield self._input_data_collected
      yield self._all_futures_resolved
      self.complete()

So after that we know no futures are pending. THat's a hard way like manually implementing refcounting but it covers IOLoop.add_callback() way of adding tasks as well

1
Do you always want to stop the IOLoop after you're done processing? Or is the request to stop the IOLoop out of band and you want it to wait until the work is done before it actually does the stopping?dano
@dano, as I understand, if I call IOLoop.stop() with some futures still unresolved, they won't be processed. So I assume that I should manually track the moment when all processing is done and then stop() itOjomio
That's true; IOLoop.stop() won't wait for outstanding work to finish. However, you its possible you can organize your application so that it automatically exits once it's done doing its work. It's hard to say without actually being able to see how your program is structured, though.dano

1 Answers

4
votes

You could write your methods so that they don't return until all the work is done, rather than scheduling callbacks. Then you could just call IOLoop.run_sync(start) and the call won't return until all the processing is complete:

from tornado import gen
from tornado.ioloop import IOLoop

@gen.coroutine
def start():
    yield get_input()

@gen.coroutine
def get_input(*args, **kwargs):
    data = yield fetch_data_over_net()
    futs = []  # list of Future objects
    futs.append(process_input(data))
    if should_call_myself(data):
        futs.append(get_input(newargs, newkwargs))
    yield futs # This will wait for all Future objects in the list to complete.

@gen.coroutine
def process_input(data):
    # do stuff

if __name__ == "__main__":
    IOLoop.instance().run_sync(start)

We take advantage of the fact that coroutines return Futures, and that Tornado supports waiting for multiple Futures in parallel so that we can run as much as possible concurrently, without actually ever returning from get_input (and therefore start) before all the dependent work is done.