Async Example/Tour

This shows how to use APSW in async node. asyncio, trio (note), and anyio (note) are supported. A different one is used for each block, although they all work.

Note

You don’t have to make all connections exclusively sync or async, and can mix and match as needed. SQLite is fast, and in many cases there may not be a benefit to concurrency with async.

#!/usr/bin/env python3

# This code uses Python's optional typing annotations.  You can
# ignore them and do not need to use them.  If you do use them
# then you must include this future annotations line first.
from __future__ import annotations

import contextlib
import io
import time
import sys

from pprint import pprint

import apsw
import apsw.aio
import apsw.bestpractice
import apsw.ext
import apsw.fts5
import apsw.shell

# all the popular async frameworks are supported
import asyncio
import anyio
import trio

Basics

Use Connection.as_async() to get an async connection, and async for to iterate results. We also apply best practice and use contextlib.aclosing() to ensure the database is closed.

apsw.bestpractice.apply(apsw.bestpractice.recommended)


async def basics():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        # create some rows
        await db.execute("""
            CREATE TABLE numbers(value, name);
            INSERT INTO numbers VALUES (1, 'one'), (20, 'twenty'),
                         (3, 'three'), (10, 'ten');
        """)

        # query - note we have to await to get the cursor before
        # iterating in the for loop
        async for value, name in await db.execute(
            "SELECT value, name FROM numbers ORDER BY value DESC"
        ):
            print(f"{value=} {name=}")

        # .get is great if you expect only a single value or
        # row.  Lets get the number of registered functions
        count = await (
            await db.execute(
                "SELECT COUNT(DISTINCT(name)) FROM pragma_function_list"
            )
        ).get
        print(f"There are {count} functions")

        # a pragma
        print(f"journal_mode={await db.pragma('journal_mode')}")

        # You should always use a transaction - use async with
        async with db:
            await db.execute("INSERT INTO numbers VALUES(7, 'seven')")
            # nested transactions are supported via savepoints
            async with db:
                await db.execute("DROP TABLE numbers")
                # any exception in the async with block
                # will rollback that block, while successful
                # completion commits the changes


asyncio.run(basics())
value=20 name='twenty'
value=10 name='ten'
value=3 name='three'
value=1 name='one'
There are 174 functions
journal_mode=memory

Functions and callbacks

Any function/callback can be sync or async.

# an async scalar function
async def a_add(one, two):
    print("async scalar called")
    return one + two


# and a sync scalar function
def s_add(one, two):
    print("sync scalar called")
    return one + two


# how about an async update hook?
async def my_hook(op: int, dbname: str, table: str, rowid: int):
    print(f"update {op=} {dbname=} {table=} {rowid=}")


async def callbacks():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        await db.create_scalar_function("a_add", a_add)
        await db.create_scalar_function("s_add", s_add)

        # use them both in the same query.  we need to await execute
        # to get the cursor, and then await the fetchall on the
        # cursor.
        print(
            await (
                await db.execute("SELECT a_add(1, 2), s_add(3, 4)")
            ).fetchall()
        )

        # A regular callback
        await db.set_update_hook(my_hook)

        await db.execute(
            "CREATE TABLE x(y); INSERT INTO x VALUES(42)"
        )


# use anyio this time
anyio.run(callbacks)
async scalar called
sync scalar called
[(3, 7)]
update op=18 dbname='main' table='x' rowid=1

Cancellation

Often a group of tasks are run at the same time. The frameworks provide a way to group tasks, wait until all are complete, and if any fail, then cancel uncompleted ones, and raise the resulting exceptions.

See also

This example shows asyncio, but the principles are the same across all the frameworks.

async def cancellation():
    # this will block SQLite for the sleep duration
    async def sleep(duration):
        await asyncio.sleep(duration)
        return duration

    async def deliberate_error():
        1 / 0
        return 3

    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        await db.create_scalar_function("sleep", sleep)

        start = time.monotonic()

        # create some tasks in a task group that will all
        # run simultaneously
        try:
            async with asyncio.TaskGroup() as tg:
                # this query will sleep for an hour
                task1 = tg.create_task(
                    db.execute("SELECT sleep(3600)")
                )
                # this query will only run after the hour sleep query
                # finishes because we can only do one SQLite query at
                # a time
                task2 = tg.create_task(
                    db.execute("SELECT * FROM sqlite_schema")
                )
                # this will also sleep for an hour
                task3 = tg.create_task(asyncio.sleep(3600))
                # this will have an error
                task4 = tg.create_task(deliberate_error())

                # the TaskGroup with block will now run all the tasks
                # to completion before exiting the block

        # note the * after except which is how you do exception groups
        except* ZeroDivisionError:
            print(
                f"got zero division error after {time.monotonic() - start:.6f} seconds"
            )

        # Lets see what happened to all the tasks.  Note how they are
        # all done (complete) and how all except the deliberate error
        # got cancelled.
        print(f"{task1.done()=} {task1.cancelled()=}")
        print(f"{task2.done()=} {task2.cancelled()=}")
        print(f"{task3.done()=} {task3.cancelled()=}")
        print(f"{task4.done()=} {task4.cancelled()=}")

        # Lets verify SQLite is not still waiting for an hour
        start = time.monotonic()
        functions = await (
            await db.execute(
                "SELECT COUNT(*) FROM pragma_function_list"
            )
        ).get
        print(
            f"After {time.monotonic() - start:.6f} seconds, "
            f"there are {functions} registered SQLite functions"
        )


asyncio.run(cancellation())
got zero division error after 0.000096 seconds
task1.done()=True task1.cancelled()=True
task2.done()=True task2.cancelled()=True
task3.done()=True task3.cancelled()=True
task4.done()=True task4.cancelled()=False
After 0.000255 seconds, there are 212 registered SQLite functions

Timeouts

This demonstrates timeouts for both async and sync code. The sync SQL is the outlandish fractal but with the 28 changed to 800_000` and would take days to run to completion.

The deadline for async functions is enforced by the async event loop and tends to be accurate. The deadline for sync functions is based on SQLite periodically calling the progress handler.

There is a dedicated apsw.aio.deadline which takes priority for all frameworks, For trio (note) and anyio (note) their native timeouts are also supported if apsw.aio.deadline has not been set. The deadline documentation has more details on setting deadlines for each framework, getting their current time, and exceptions raised on timeout.

# The query is not reproduced here but is used when running this
# example.
fractal_sql = "outlandish fractal"


async def timeouts():
    async def sleep(amount):
        await trio.sleep(amount)
        return 42

    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        await db.create_scalar_function("sleep", sleep)

        try:
            # This will work with every framework.  Half a second from now.
            with apsw.aio.contextvar_set(
                apsw.aio.deadline, trio.current_time() + 0.5
            ):
                start = trio.current_time()
                await (
                    await db.execute("SELECT sleep(3600)")
                ).fetchall()
        except trio.TooSlowError:
            end = trio.current_time()
            print(
                f"Got async function TooSlowError after {end - start:.6f} seconds"
            )

        # With trio and anyio we can use the native framework timeouts
        try:
            with trio.fail_after(0.5):
                await (await db.execute(fractal_sql)).fetchall()
        except trio.TooSlowError:
            end = trio.current_time()
            print(
                f"Got sync function TooSlowError after {end - start:.6f} seconds"
            )


trio.run(timeouts)
Got async function TooSlowError after 0.501280 seconds
Got sync function TooSlowError after 1.001975 seconds

Worker thread

Async connections work by running the SQLite operations in a dedicated background thread. You can also run your own code there which is especially useful if it does many calls before returning a final result. Use Connection.async_run().

Examples shown include Managing and updating your schema and getting a text dump.

In the worker thread, the connection is a regular sync connection.

def schema_upgrade(db: apsw.Connection):
    # The user_version is a great way of tracking and upgrading the
    # schema.  Because this is run in the worker thread it is the
    # normal sync approach.

    # Do everything in a single transactions
    with db:
        # database fresh state
        if db.pragma("user_version") == 0:
            db.execute("""
                CREATE TABLE products(id, name, sku, price);
                CREATE TABLE orders(id, product_id, quantity);
                pragma user_version = 1;
            """)
        if db.pragma("user_version") == 1:
            db.execute("""
                ALTER TABLE products ADD COLUMN description;
                CREATE INDEX orders_idx ON orders(id, product_id);
                pragma user_version = 2;
            """)
        # we could exception here if user_version > 2 because it means
        # a more recent schema is present than this code understands.
        # Perhaps a version downgrade happened?


async def worker_thread():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        # Do the upgrade
        await db.async_run(schema_upgrade, db)

        # This is doing one operation
        await db.async_run(
            db.execute,
            "INSERT INTO products(id, name) VALUES(?,?)",
            (37, "Banana"),
        )

        # Getting a result
        rows = await db.async_run(
            lambda: db.execute("SELECT COUNT(*) FROM products").get
        )
        print(f"{rows=}")

        # get a dump - to a memory file here, but you'd want to use
        # a real file
        out = io.StringIO()
        await db.async_run(
            lambda: apsw.shell.Shell(
                db=db, stdout=out
            ).process_command(".dump")
        )
        dump = out.getvalue()
        print(f"Dump is {len(dump)} chars starting {repr(dump):.40}")

        # Some stuff from apsw.ext
        usage = await db.async_run(apsw.ext.analyze_pages, db, 2)
        details = await db.async_run(
            apsw.ext.query_info, db, "SELECT * FROM sqlite_schema"
        )


anyio.run(worker_thread)
rows=1
Dump is 747 chars starting "-- SQLite dump (by APSW 3.52.0.0)\n-- S

Virtual tables

Virtual tables are a very good match for async, especially if networking is involved. You can do your own full implementation using sync and async methods as needed, but will find it easier to start with apsw.ext.make_virtual_module() that turns any Python function (sync or async) in a virtual table accepting positional and keyword arguments. The sync example is here with the async below being pretty much the same thing.

async def data_table(flags, server="example.com"):
    # one positional and one keyword argument.  note you can't change
    # the number of columns or their order based on the arguments but
    # you can change what rows are returned and their contents
    print(f"data_table called with {flags=} and {server=}")

    # you would do real work here - we just provide some books
    yield ("The Great Gatsby", 1925, 9.2)
    yield ("To Kill a Mockingbird", 1960, 9.5)
    yield ("1984", 1949, 9.8)
    yield ("The Catcher in the Rye", 1951, 8.4)
    yield ("The Hobbit", 1937, 9.6)


# Tell make_virtual_module about the columns
data_table.columns = ("title", "year", "review")
# ... and how to extract them from each row
data_table.column_access = apsw.ext.VTColumnAccess.By_Index


async def virtual_tables():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        await apsw.ext.make_virtual_module(db, "books", data_table)

        # regular query
        async for row in await db.execute(
            "SELECT * FROM books WHERE flags=94 AND server=?",
            ("example2.com",),
        ):
            print(row)

        # SQLite will figure out the review and order work
        async for row in await db.execute(
            "SELECT * FROM books WHERE server=? AND flags=?AND review > 9.55 "
            "   ORDER BY year DESC",
            ("orange", -2),
        ):
            print(row)


trio.run(virtual_tables)
data_table called with flags=94 and server='example2.com'
('The Great Gatsby', 1925, 9.2)
('To Kill a Mockingbird', 1960, 9.5)
('1984', 1949, 9.8)
('The Catcher in the Rye', 1951, 8.4)
('The Hobbit', 1937, 9.6)
data_table called with flags=-2 and server='orange'
('1984', 1949, 9.8)
('The Hobbit', 1937, 9.6)

Tracing in a block

This is the same as sync tracing in a block adapted to use async with for apsw.ext.Trace and transaction control.

async def tracing():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        # Use None instead of stdout and no information is printed or gathered
        async with apsw.ext.Trace(
            sys.stdout,
            db=db,
            vtable=True,
            updates=True,
            transaction=True,
        ):
            # APSW does a savepoint behind the scenes to wrap the block
            async with db:
                # Some regular SQL
                await db.execute("create table multi(x)")
                # executemany runs the same statement repeatedly
                await db.executemany(
                    "insert into multi values(?)",
                    ((x,) for x in range(5)),
                )
                # See how many rows were processed
                await (
                    await db.execute("select * from multi limit 2")
                ).fetchall()
                # You can also see how many rows were changed
                await db.execute("delete from multi where x < 4")

            # pragma functions are virtual tables - see how many rows this processes even
            # though only one has 'pow'
            await (
                await db.execute(
                    "SELECT narg FROM pragma_function_list WHERE name='pow'"
                )
            ).get

            # trigger that causes rollback
            await db.execute("""
                create trigger error after insert on multi
                begin
                update multi set rowid=100+new.rowid where rowid=new.rowid;
                select raise(rollback, 'nope');
            end;
            """)

            with contextlib.suppress(apsw.ConstraintError):
                await db.execute("insert into multi values(54)")


asyncio.run(tracing())
> BEGIN DEFERRED
    Time: 0.000
 !BEGIN
> create table multi(x)
    Time: 0.000
> insert into multi values(?)
  INS 1 (0)
    Time: 0.000  Changes: 1
> insert into multi values(?)
  INS 2 (1)
    Time: 0.000  Changes: 1
> insert into multi values(?)
  INS 3 (2)
    Time: 0.000  Changes: 1
> insert into multi values(?)
  INS 4 (3)
    Time: 0.000  Changes: 1
> insert into multi values(?)
  INS 5 (4)
    Time: 0.000  Changes: 1
> select * from multi limit 2
    Time: 0.000  Rows: 3
> delete from multi where x < 4
  DEL 1 (0)
  DEL 2 (1)
  DEL 3 (2)
  DEL 4 (3)
    Time: 0.000  Changes: 4
> COMMIT
 !COMMIT
    Time: 0.000
> SELECT narg FROM pragma_function_list WHERE name='pow'
V PRAGMA function_list
    Time: 0.000  Rows: 211  VmStep: 1,481  Mem: 76.8KB
< SELECT narg FROM pragma_function_list WHERE name='pow'
    Time: 0.000  Rows: 1  VmStep: 644
 !BEGIN
> create trigger error after insert on multi
               begin
         ...
 !COMMIT
    Time: 0.000
 !BEGIN
> insert into multi values(54)
  INS 6 (54)
T TRIGGER error
    UPD 6>106 (...)
 !ROLLBACK
< insert into multi values(54)
    Time: 0.001  Changes: 1

Resource usage in a block

The async equivalent of the sync example.

# The standard outlandish fractal example which is used when running
# but not reproduced here.
query = "fractal"


async def resource_usage():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        print("thread (async event loop)")

        async with apsw.ext.ShowResourceUsage(
            sys.stdout, db=db, scope="thread"
        ):
            # some SQLite work
            await (await db.execute(query)).get

            # and take some wall clock time
            await trio.sleep(0.5)

        print("\nprocess (including background SQLite worker)")

        async with apsw.ext.ShowResourceUsage(
            sys.stdout, db=db, scope="process"
        ):
            # some SQLite work
            await (await db.execute(query)).get

            # and take some wall clock time
            await trio.sleep(0.5)


trio.run(resource_usage)
thread (async event loop)
                     Process CPU consumption 0.016
                                  Wall clock 0.517
                  Voluntary context switches 4
                      SQLite full table scan 1,365
                      SQLite sort operations 2
                        SQLite vm operations 1,015,353
                 SQLite statements completed 1
                 SQLite lookaside slots used 11
          SQLite allocations using lookaside 17,414
    SQLite allocations too big for lookaside 1
           SQLite allocations lookaside full 282
                     SQLite statement memory 17,080

process (including background SQLite worker)
                     Process CPU consumption 0.015
                                  Wall clock 0.516
                        Page faults - no I/O 3
                Involuntary context switches 63
                  Voluntary context switches 8
                       CPU time in user mode 0.015
                      SQLite full table scan 1,365
                      SQLite sort operations 2
                        SQLite vm operations 1,015,353
                 SQLite statements completed 1
          SQLite allocations using lookaside 17,272

Blob

Async Blob Input/Output run in the SQLite worker thread. See the sync example which this is a direct translation to async.

async def blob():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        await db.execute("create table blobby(x,y)")
        # Add a blob we will fill in later
        await db.execute(
            "insert into blobby values(1, zeroblob(10000))"
        )
        # Or as a binding
        await db.execute(
            "insert into blobby values(2, ?)", (apsw.zeroblob(20000),)
        )
        # Open a blob for writing.  We need to know the rowid
        rowid = await (
            await db.execute("select ROWID from blobby where x=1")
        ).get
        blob = await db.blob_open("main", "blobby", "y", rowid, True)
        await blob.write(b"hello world")
        # seeking is immediate (no await)
        blob.seek(2000)
        await blob.read(24)
        # seek relative to the end
        blob.seek(-32, 2)
        await blob.write(b"hello world, again")
        # it will be automatically closed when the connection is
        # closed, but explicitly closing chooses transaction
        # boundaries
        await blob.aclose()


anyio.run(blob)

Backup

Backups run in the SQLite worker thread of the async destination database. The source can be a sync or async database. You do backups by getting the backup object from the destination database telling it about the source using Connection.backup(),

If the destination is sync and you are working with an async source, you can run the backup in the async source thread as demonstrated below.

async def backup():
    # Setup source and destinations

    async_source = await apsw.Connection.as_async("")
    # ... and fill it with a large amount of data
    await async_source.execute(
        "CREATE TABLE x(y); INSERT INTO x VALUES(randomblob(250000))"
    )
    sync_source = apsw.Connection("")
    sync_source.execute(
        "CREATE TABLE x(y); INSERT INTO x VALUES(randomblob(250000))"
    )

    async_dest = await apsw.Connection.as_async("")
    sync_dest = apsw.Connection("")

    print("async destination, async source")
    async with await async_dest.backup(
        "main", async_source, "main"
    ) as backup:
        while not backup.done:
            await backup.step(42)
            print(
                f"page_count = {backup.page_count} remaining = {backup.remaining}"
            )

    print("async destination, sync source")
    async with await async_dest.backup(
        "main", sync_source, "main"
    ) as backup:
        while not backup.done:
            await backup.step(42)
            print(
                f"page_count = {backup.page_count} remaining = {backup.remaining}"
            )

    print("sync destination, async source")

    # we will run this in the async source thread
    def do_backup():
        with sync_dest.backup("main", async_source, "main") as backup:
            while not backup.done:
                backup.step(42)
                print(
                    f"page_count = {backup.page_count} remaining = {backup.remaining}"
                )

    await async_source.async_run(do_backup)

    # ensure connections get closed
    await async_source.aclose()
    await async_dest.aclose()


asyncio.run(backup())
async destination, async source
page_count = 63 remaining = 21
page_count = 63 remaining = 0
async destination, sync source
page_count = 63 remaining = 21
page_count = 63 remaining = 0
sync destination, async source
page_count = 63 remaining = 21
page_count = 63 remaining = 0

Session

Use apsw.aio.make_session() to create the Session object in async mode from an async connection.

async def session_example():
    db = await apsw.Connection.as_async(":memory:")

    # always close database
    async with contextlib.aclosing(db):
        await db.execute("CREATE TABLE x(y PRIMARY KEY, z)")

        session = await apsw.aio.make_session(db, "main")

        # We'd like size estimates
        session.config(apsw.SQLITE_SESSION_OBJCONFIG_SIZE, True)

        # all tables
        await session.attach()

        # add some data
        await db.executemany(
            "INSERT INTO x VALUES(?,?)",
            ((i, "a" * i) for i in range(200)),
        )

        print("Size estimate {session.changeset_size}")
        changeset = await session.changeset()
        print(f"Actual size {len(changeset)}")

        # Other than apply, changeset operations don't use a
        # Connection so we'll use trio's mechanism to do invert in a
        # background thread.
        undo = await trio.to_thread.run_sync(
            apsw.Changeset.invert, changeset
        )

        #  Undo the changes
        await apsw.Changeset.apply(undo, db)


trio.run(session_example)