4
votes

I cleaned up several examples online and came up with this. This code works but I wonder if there is there a better way of doing this?

user.py:


from asyncpg import create_pool
from sanic import Blueprint

bp = Blueprint('dp')

class pg:
    def __init__(self, pg_pool):
        self.pg_pool = pg_pool

    async def fetch(self, sql, *args, **kwargs):
        async with self.pg_pool.acquire() as connection:
            return await connection.fetch(sql, *args, **kwargs)

    async def execute(self, sql, *args, **kwargs):
        async with self.pg_pool.acquire() as connection:
            return await connection.execute(sql, *args, **kwargs)

@bp.listener('before_server_start')
async def init_pg(app, loop):
    """
    Init Postgresql DB.
    """
    bp.pg_pool = await create_pool(
        **app.config.PG_CFG,
        max_inactive_connection_lifetime=60,
        min_size=1,
        max_size=3,
        loop=loop,
    )
    bp.pg = pg(bp.pg_pool)
    print('-------- setup connection pool --------')

Use the pg class now in the webapp.py

webapp.py:


@app.route("/")
async def root(req):
    result = await app.pg.fetch('SELECT * FROM foo')
1

1 Answers

0
votes

I have not done it myself using postgres, however I have with Redis doing something similar. Also, I know other people have used similar setups using before_server_start to create the pool.

Another option would be to

Take a look at: https://github.com/Skyscanner/aiotask-context and https://github.com/MagicStack/contextvars. Perhaps they may offer help in attaching something like a connection to the loop.