77
votes

I'm migrating from tornado to asyncio, and I can't find the asyncio equivalent of tornado's PeriodicCallback. (A PeriodicCallback takes two arguments: the function to run and the number of milliseconds between calls.)

  • Is there such an equivalent in asyncio?
  • If not, what would be the cleanest way to implement this without running the risk of getting a RecursionError after a while?
6
Why do you need to move from tornado? They can work together, no? tornadoweb.org/en/stable/asyncio.htmlOneCricketeer
Just add await asyncio.sleep(time) to your function.songololo
Same with Twisted, no LoopingCall implementation.zgoda

6 Answers

71
votes

For Python versions below 3.5:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

For Python 3.5 and above:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
33
votes

When you feel that something should happen "in background" of your asyncio program, asyncio.Task might be good way to do it. You can read this post to see how to work with tasks.

Here's possible implementation of class that executes some function periodically:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

Let's test it:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

As you see on start we just start task that calls some functions and sleeps some time in endless loop. On stop we just cancel that task. Note, that task should be stopped at the moment program finished.

One more important thing that your callback shouldn't take much time to be executed (or it'll freeze your event loop). If you're planning to call some long-running func, you possibly would need to run it in executor.

23
votes

There is no built-in support for periodic calls, no.

Just create your own scheduler loop that sleeps and executes any tasks scheduled:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()

The scheduled_tasks() iterator should produce tasks that are ready to be run at the given time. Note that producing the schedule and kicking off all the tasks could in theory take longer than 1 second; the idea here is that the scheduler yields all tasks that should have started since the last check.

14
votes

A variant that may be helpful: if you want your recurring call to happen every n seconds instead of n seconds between the end of the last execution and the beginning of the next, and you don't want calls to overlap in time, the following is simpler:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

And an example of using it to run a couple tasks in the background:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
10
votes

Alternative version with decorator for python 3.7

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))
4
votes

Based on @A. Jesse Jiryu Davis response (with @Torkel Bjørnson-Langen and @ReWrite comments) this is an improvement which avoids drift.

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass