1
votes

I'm attempting to create an in-memory sqlite3 cache to store oauth tokens, but am running into issues regarding multithreading. After running several tests, I've noticed the behavior differs substantially from non-in-memory databases and multithreading.

Notably, reader threads immediately fail with "table is locked" if a writer thread has written without committing. This is true with multiple threads even with isolation_level=None.

It's not simply that readers are blocked until the transaction is complete, but rather they fail immediately, regardless of timeout or PRAGMA busy_timeout = 10000.

The only way I can get it working is to set isolation_level=None and to do PRAGMA read_uncommitted=TRUE. I would rather not do this, however.

Is it possible to let the reader threads simply wait the lock instead of immediately failing?


import sqlite3
import threading

def get_conn(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None):
    uri = 'file:%s' % name
    if is_memory:
        uri = uri + '?mode=memory&cache=shared'
    conn = sqlite3.connect(uri, uri=True, timeout=timeout, isolation_level=isolation_level)
    if pragmas is None:
        pragmas = []
    if not isinstance(pragmas, list):
        pragmas = [pragmas]
    for pragma in pragmas:
        conn.execute(pragma)
    return conn


def work1(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1):
    conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
    for i in range(loops):
        conn.execute('INSERT INTO foo VALUES (1)')


def work2(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1):
    conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
    for i in range(loops):
       len(conn.execute('SELECT * FROM foo').fetchall())


def main(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1, num_threads=16):
    conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
    try:
        conn.execute('CREATE TABLE foo(a int)')
    except sqlite3.OperationalError:
        conn.execute('DROP TABLE foo')
        conn.execute('CREATE TABLE foo(a int)')
    threads = []
    for i in range(num_threads):
        threads.append(threading.Thread(target=work1, args=(name, is_memory, timeout, isolation_level, pragmas, loops)))
        threads.append(threading.Thread(target=work2, args=(name, is_memory, timeout, isolation_level, pragmas, loops)))
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

# In-Memory Tests
# All of these fail immediately with table is locked. There is no delay; timeout/busy_timeout has no effect.
main('a', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=None)
main('b', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=None)
main('c', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=None)
main('d', is_memory=True, timeout=5, isolation_level=None, pragmas=None)
main('e', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000'])
main('f', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000'])
main('g', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000'])
main('h', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000'])
main('i', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('j', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('k', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE'])
# This is the only successful operation, when isolation_level = None and PRAGMA read_uncommitted=TRUE
main('l', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'])
# These start to take a really long time
main('m', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100)
main('n', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128)

# None of the on disk DB's ever fail:
main('o', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None)
main('p', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=None)
main('q', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=None)
main('r', is_memory=False, timeout=5, isolation_level=None, pragmas=None)
main('s', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000'])
main('t', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000'])
main('u', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000'])
main('v', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000'])
main('w', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('x', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('y', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('z', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'])
# These actually fail with database is locked
main('aa', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100)
main('ab', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128)
1
You could build a queue for all database queries instead of trying to access at same time. The queue would handle that in its own thread and deliver stuff to the other threads.Saelyth
I second @Saelyth's suggestion. Having multiple threads share a SQLite3 connection sounds like a nightmare. Refusing to work is kind of a best-case scenario. Just have one thread that talks to the DB. Bonus, if you do decide to use an on-disk DB, this gives you the opportunity to make a sane decision about when to commit.Matthew Woodruff
@MatthewWoodruff According to my tests above, I am able to use multi-threading without any isses for on-disk Sqlite. I do not have experience with it in a production environment, however. What are the problems you would anticipate?Matthew Moisen

1 Answers

0
votes

I do not believe that the SQLite3 interface is meant to be re-entrant. I think that each thread would have to obtain a mutex, perform the query, and then release the mutex. Attempt to perform only one database operation at a time. (Python's API-layer would not be expected to do this, as there would ordinarily be no need for any such thing.)