I cleaned up several examples online and came up with this. This code works but I wonder if there is there a better way of doing this?
user.py:
from asyncpg import create_pool
from sanic import Blueprint
bp = Blueprint('dp')
class pg:
def __init__(self, pg_pool):
self.pg_pool = pg_pool
async def fetch(self, sql, *args, **kwargs):
async with self.pg_pool.acquire() as connection:
return await connection.fetch(sql, *args, **kwargs)
async def execute(self, sql, *args, **kwargs):
async with self.pg_pool.acquire() as connection:
return await connection.execute(sql, *args, **kwargs)
@bp.listener('before_server_start')
async def init_pg(app, loop):
"""
Init Postgresql DB.
"""
bp.pg_pool = await create_pool(
**app.config.PG_CFG,
max_inactive_connection_lifetime=60,
min_size=1,
max_size=3,
loop=loop,
)
bp.pg = pg(bp.pg_pool)
print('-------- setup connection pool --------')
Use the pg class now in the webapp.py
webapp.py:
@app.route("/")
async def root(req):
result = await app.pg.fetch('SELECT * FROM foo')