.. Automatically generated by example2rst.py. Do not edit this file .. currentmodule:: apsw Async Example/Tour ================== This shows how to use APSW in :doc:`async ` node. :mod:`asyncio`, |trio|, and |anyio| are supported. A different one is used for each block, although they all work. .. note:: You don't have to make all connections exclusively sync or async, and can mix and match as needed. SQLite is fast, and in many cases there may not be a benefit to concurrency with async. .. code-block:: python #!/usr/bin/env python3 # This code uses Python's optional typing annotations. You can # ignore them and do not need to use them. If you do use them # then you must include this future annotations line first. from __future__ import annotations import contextlib import io import time import sys from pprint import pprint import apsw import apsw.aio import apsw.bestpractice import apsw.ext import apsw.fts5 import apsw.shell # all the popular async frameworks are supported import asyncio import anyio import trio .. index:: Basics (Async example code) .. _example_async_basics: Basics ------ Use :meth:`Connection.as_async` to get an async connection, and :code:`async for` to iterate results. We also apply :doc:`best practice ` and use :func:`contextlib.aclosing` to ensure the database is closed. .. code-block:: python apsw.bestpractice.apply(apsw.bestpractice.recommended) async def basics(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): # create some rows await db.execute(""" CREATE TABLE numbers(value, name); INSERT INTO numbers VALUES (1, 'one'), (20, 'twenty'), (3, 'three'), (10, 'ten'); """) # query - note we have to await to get the cursor before # iterating in the for loop async for value, name in await db.execute( "SELECT value, name FROM numbers ORDER BY value DESC" ): print(f"{value=} {name=}") # .get is great if you expect only a single value or # row. Lets get the number of registered functions count = await ( await db.execute( "SELECT COUNT(DISTINCT(name)) FROM pragma_function_list" ) ).get print(f"There are {count} functions") # a pragma print(f"journal_mode={await db.pragma('journal_mode')}") # You should always use a transaction - use async with async with db: await db.execute("INSERT INTO numbers VALUES(7, 'seven')") # nested transactions are supported via savepoints async with db: await db.execute("DROP TABLE numbers") # any exception in the async with block # will rollback that block, while successful # completion commits the changes asyncio.run(basics()) .. code-block:: output value=20 name='twenty' value=10 name='ten' value=3 name='three' value=1 name='one' There are 174 functions journal_mode=memory .. index:: Functions and callbacks (Async example code) .. _example_async_callbacks: Functions and callbacks ----------------------- Any function/callback can be sync or async. .. code-block:: python # an async scalar function async def a_add(one, two): print("async scalar called") return one + two # and a sync scalar function def s_add(one, two): print("sync scalar called") return one + two # how about an async update hook? async def my_hook(op: int, dbname: str, table: str, rowid: int): print(f"update {op=} {dbname=} {table=} {rowid=}") async def callbacks(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.create_scalar_function("a_add", a_add) await db.create_scalar_function("s_add", s_add) # use them both in the same query. we need to await execute # to get the cursor, and then await the fetchall on the # cursor. print( await ( await db.execute("SELECT a_add(1, 2), s_add(3, 4)") ).fetchall() ) # A regular callback await db.set_update_hook(my_hook) await db.execute( "CREATE TABLE x(y); INSERT INTO x VALUES(42)" ) # use anyio this time anyio.run(callbacks) .. code-block:: output async scalar called sync scalar called [(3, 7)] update op=18 dbname='main' table='x' rowid=1 .. index:: Cancellation (Async example code) .. _example_async_cancellation: Cancellation ------------ Often a group of tasks are run at the same time. The frameworks provide a way to group tasks, wait until all are complete, and if any fail, then cancel uncompleted ones, and raise the resulting exceptions. .. seealso:: * :external+python:ref:`except * ` and :class:`ExceptionGroup` - Python syntax for catching multiple exceptions such as from a group of tasks * :class:`asyncio.TaskGroup` * :external+trio:ref:`Trio tasks ` * :external+anyio:doc:`AnyIO tasks ` This example shows asyncio, but the principles are the same across all the frameworks. .. code-block:: python async def cancellation(): # this will block SQLite for the sleep duration async def sleep(duration): await asyncio.sleep(duration) return duration async def deliberate_error(): 1 / 0 return 3 db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.create_scalar_function("sleep", sleep) start = time.monotonic() # create some tasks in a task group that will all # run simultaneously try: async with asyncio.TaskGroup() as tg: # this query will sleep for an hour task1 = tg.create_task( db.execute("SELECT sleep(3600)") ) # this query will only run after the hour sleep query # finishes because we can only do one SQLite query at # a time task2 = tg.create_task( db.execute("SELECT * FROM sqlite_schema") ) # this will also sleep for an hour task3 = tg.create_task(asyncio.sleep(3600)) # this will have an error task4 = tg.create_task(deliberate_error()) # the TaskGroup with block will now run all the tasks # to completion before exiting the block # note the * after except which is how you do exception groups except* ZeroDivisionError: print( f"got zero division error after {time.monotonic() - start:.6f} seconds" ) # Lets see what happened to all the tasks. Note how they are # all done (complete) and how all except the deliberate error # got cancelled. print(f"{task1.done()=} {task1.cancelled()=}") print(f"{task2.done()=} {task2.cancelled()=}") print(f"{task3.done()=} {task3.cancelled()=}") print(f"{task4.done()=} {task4.cancelled()=}") # Lets verify SQLite is not still waiting for an hour start = time.monotonic() functions = await ( await db.execute( "SELECT COUNT(*) FROM pragma_function_list" ) ).get print( f"After {time.monotonic() - start:.6f} seconds, " f"there are {functions} registered SQLite functions" ) asyncio.run(cancellation()) .. code-block:: output got zero division error after 0.000096 seconds task1.done()=True task1.cancelled()=True task2.done()=True task2.cancelled()=True task3.done()=True task3.cancelled()=True task4.done()=True task4.cancelled()=False After 0.000255 seconds, there are 212 registered SQLite functions .. index:: Timeouts (Async example code) .. _example_async_timeout: Timeouts -------- This demonstrates timeouts for both async and sync code. The sync SQL is `the outlandish fractal `__ but with the ``28`` changed to ``800_000``` and would take days to run to completion. The deadline for async functions is enforced by the async event loop and tends to be accurate. The deadline for sync functions is based on SQLite periodically calling the :meth:`progress handler `. There is a dedicated :attr:`apsw.aio.deadline` which takes priority for all frameworks, For |trio| and |anyio| their native timeouts are also supported if :attr:`apsw.aio.deadline` has not been set. The :attr:`~apsw.aio.deadline` documentation has more details on setting deadlines for each framework, getting their current time, and exceptions raised on timeout. .. seealso:: * :meth:`asyncio.get_running_loop().time() ` * :exc:`TimeoutError` * :exc:`trio.TooSlowError` * :func:`trio.current_time` * :func:`trio.current_effective_deadline` * :func:`trio.fail_after` :func:`trio.fail_at` * :func:`anyio.current_time` * :func:`anyio.current_effective_deadline` * :func:`anyio.fail_after` .. code-block:: python # The query is not reproduced here but is used when running this # example. fractal_sql = "outlandish fractal" async def timeouts(): async def sleep(amount): await trio.sleep(amount) return 42 db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.create_scalar_function("sleep", sleep) try: # This will work with every framework. Half a second from now. with apsw.aio.contextvar_set( apsw.aio.deadline, trio.current_time() + 0.5 ): start = trio.current_time() await ( await db.execute("SELECT sleep(3600)") ).fetchall() except trio.TooSlowError: end = trio.current_time() print( f"Got async function TooSlowError after {end - start:.6f} seconds" ) # With trio and anyio we can use the native framework timeouts try: with trio.fail_after(0.5): await (await db.execute(fractal_sql)).fetchall() except trio.TooSlowError: end = trio.current_time() print( f"Got sync function TooSlowError after {end - start:.6f} seconds" ) trio.run(timeouts) .. code-block:: output Got async function TooSlowError after 0.501280 seconds Got sync function TooSlowError after 1.001975 seconds .. index:: Worker thread (Async example code) .. _example_async_worker_thread: Worker thread ------------- Async connections work by running the SQLite operations in a dedicated background thread. You can also run your own code there which is especially useful if it does many calls before returning a final result. Use :meth:`Connection.async_run`. Examples shown include :ref:`schema_upgrade` and getting a text :ref:`dump `. In the worker thread, the connection is a regular sync connection. .. code-block:: python def schema_upgrade(db: apsw.Connection): # The user_version is a great way of tracking and upgrading the # schema. Because this is run in the worker thread it is the # normal sync approach. # Do everything in a single transactions with db: # database fresh state if db.pragma("user_version") == 0: db.execute(""" CREATE TABLE products(id, name, sku, price); CREATE TABLE orders(id, product_id, quantity); pragma user_version = 1; """) if db.pragma("user_version") == 1: db.execute(""" ALTER TABLE products ADD COLUMN description; CREATE INDEX orders_idx ON orders(id, product_id); pragma user_version = 2; """) # we could exception here if user_version > 2 because it means # a more recent schema is present than this code understands. # Perhaps a version downgrade happened? async def worker_thread(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): # Do the upgrade await db.async_run(schema_upgrade, db) # This is doing one operation await db.async_run( db.execute, "INSERT INTO products(id, name) VALUES(?,?)", (37, "Banana"), ) # Getting a result rows = await db.async_run( lambda: db.execute("SELECT COUNT(*) FROM products").get ) print(f"{rows=}") # get a dump - to a memory file here, but you'd want to use # a real file out = io.StringIO() await db.async_run( lambda: apsw.shell.Shell( db=db, stdout=out ).process_command(".dump") ) dump = out.getvalue() print(f"Dump is {len(dump)} chars starting {repr(dump):.40}") # Some stuff from apsw.ext usage = await db.async_run(apsw.ext.analyze_pages, db, 2) details = await db.async_run( apsw.ext.query_info, db, "SELECT * FROM sqlite_schema" ) anyio.run(worker_thread) .. code-block:: output rows=1 Dump is 747 chars starting "-- SQLite dump (by APSW 3.52.0.0)\n-- S .. index:: Virtual tables (Async example code) .. _example_async_vtable: Virtual tables -------------- :ref:`Virtual tables ` are a very good match for async, especially if networking is involved. You can do your own :class:`full implementation ` using sync and async methods as needed, but will find it easier to start with :func:`apsw.ext.make_virtual_module` that turns any Python function (sync or async) in a virtual table accepting positional and keyword arguments. The sync :ref:`example is here ` with the async below being pretty much the same thing. .. code-block:: python async def data_table(flags, server="example.com"): # one positional and one keyword argument. note you can't change # the number of columns or their order based on the arguments but # you can change what rows are returned and their contents print(f"data_table called with {flags=} and {server=}") # you would do real work here - we just provide some books yield ("The Great Gatsby", 1925, 9.2) yield ("To Kill a Mockingbird", 1960, 9.5) yield ("1984", 1949, 9.8) yield ("The Catcher in the Rye", 1951, 8.4) yield ("The Hobbit", 1937, 9.6) # Tell make_virtual_module about the columns data_table.columns = ("title", "year", "review") # ... and how to extract them from each row data_table.column_access = apsw.ext.VTColumnAccess.By_Index async def virtual_tables(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await apsw.ext.make_virtual_module(db, "books", data_table) # regular query async for row in await db.execute( "SELECT * FROM books WHERE flags=94 AND server=?", ("example2.com",), ): print(row) # SQLite will figure out the review and order work async for row in await db.execute( "SELECT * FROM books WHERE server=? AND flags=?AND review > 9.55 " " ORDER BY year DESC", ("orange", -2), ): print(row) trio.run(virtual_tables) .. code-block:: output data_table called with flags=94 and server='example2.com' ('The Great Gatsby', 1925, 9.2) ('To Kill a Mockingbird', 1960, 9.5) ('1984', 1949, 9.8) ('The Catcher in the Rye', 1951, 8.4) ('The Hobbit', 1937, 9.6) data_table called with flags=-2 and server='orange' ('1984', 1949, 9.8) ('The Hobbit', 1937, 9.6) .. index:: Tracing in a block (Async example code) .. _example_async_trace: Tracing in a block ------------------ This is the same as :ref:`sync tracing in a block ` adapted to use ``async with`` for :class:`apsw.ext.Trace` and transaction control. .. code-block:: python async def tracing(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): # Use None instead of stdout and no information is printed or gathered async with apsw.ext.Trace( sys.stdout, db=db, vtable=True, updates=True, transaction=True, ): # APSW does a savepoint behind the scenes to wrap the block async with db: # Some regular SQL await db.execute("create table multi(x)") # executemany runs the same statement repeatedly await db.executemany( "insert into multi values(?)", ((x,) for x in range(5)), ) # See how many rows were processed await ( await db.execute("select * from multi limit 2") ).fetchall() # You can also see how many rows were changed await db.execute("delete from multi where x < 4") # pragma functions are virtual tables - see how many rows this processes even # though only one has 'pow' await ( await db.execute( "SELECT narg FROM pragma_function_list WHERE name='pow'" ) ).get # trigger that causes rollback await db.execute(""" create trigger error after insert on multi begin update multi set rowid=100+new.rowid where rowid=new.rowid; select raise(rollback, 'nope'); end; """) with contextlib.suppress(apsw.ConstraintError): await db.execute("insert into multi values(54)") asyncio.run(tracing()) .. code-block:: output > BEGIN DEFERRED Time: 0.000 !BEGIN > create table multi(x) Time: 0.000 > insert into multi values(?) INS 1 (0) Time: 0.000 Changes: 1 > insert into multi values(?) INS 2 (1) Time: 0.000 Changes: 1 > insert into multi values(?) INS 3 (2) Time: 0.000 Changes: 1 > insert into multi values(?) INS 4 (3) Time: 0.000 Changes: 1 > insert into multi values(?) INS 5 (4) Time: 0.000 Changes: 1 > select * from multi limit 2 Time: 0.000 Rows: 3 > delete from multi where x < 4 DEL 1 (0) DEL 2 (1) DEL 3 (2) DEL 4 (3) Time: 0.000 Changes: 4 > COMMIT !COMMIT Time: 0.000 > SELECT narg FROM pragma_function_list WHERE name='pow' V PRAGMA function_list Time: 0.000 Rows: 211 VmStep: 1,481 Mem: 76.8KB < SELECT narg FROM pragma_function_list WHERE name='pow' Time: 0.000 Rows: 1 VmStep: 644 !BEGIN > create trigger error after insert on multi begin ... !COMMIT Time: 0.000 !BEGIN > insert into multi values(54) INS 6 (54) T TRIGGER error UPD 6>106 (...) !ROLLBACK < insert into multi values(54) Time: 0.001 Changes: 1 .. index:: Resource usage in a block (Async example code) .. _example_async_resource: Resource usage in a block ------------------------- The async equivalent of :ref:`the sync example `. .. code-block:: python # The standard outlandish fractal example which is used when running # but not reproduced here. query = "fractal" async def resource_usage(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): print("thread (async event loop)") async with apsw.ext.ShowResourceUsage( sys.stdout, db=db, scope="thread" ): # some SQLite work await (await db.execute(query)).get # and take some wall clock time await trio.sleep(0.5) print("\nprocess (including background SQLite worker)") async with apsw.ext.ShowResourceUsage( sys.stdout, db=db, scope="process" ): # some SQLite work await (await db.execute(query)).get # and take some wall clock time await trio.sleep(0.5) trio.run(resource_usage) .. code-block:: output thread (async event loop) Process CPU consumption 0.016 Wall clock 0.517 Voluntary context switches 4 SQLite full table scan 1,365 SQLite sort operations 2 SQLite vm operations 1,015,353 SQLite statements completed 1 SQLite lookaside slots used 11 SQLite allocations using lookaside 17,414 SQLite allocations too big for lookaside 1 SQLite allocations lookaside full 282 SQLite statement memory 17,080 process (including background SQLite worker) Process CPU consumption 0.015 Wall clock 0.516 Page faults - no I/O 3 Involuntary context switches 63 Voluntary context switches 8 CPU time in user mode 0.015 SQLite full table scan 1,365 SQLite sort operations 2 SQLite vm operations 1,015,353 SQLite statements completed 1 SQLite allocations using lookaside 17,272 .. index:: Blob (Async example code) .. _example_async_blob: Blob ---- Async :doc:`blob` run in the SQLite worker thread. See the :ref:`sync example ` which this is a direct translation to async. .. code-block:: python async def blob(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.execute("create table blobby(x,y)") # Add a blob we will fill in later await db.execute( "insert into blobby values(1, zeroblob(10000))" ) # Or as a binding await db.execute( "insert into blobby values(2, ?)", (apsw.zeroblob(20000),) ) # Open a blob for writing. We need to know the rowid rowid = await ( await db.execute("select ROWID from blobby where x=1") ).get blob = await db.blob_open("main", "blobby", "y", rowid, True) await blob.write(b"hello world") # seeking is immediate (no await) blob.seek(2000) await blob.read(24) # seek relative to the end blob.seek(-32, 2) await blob.write(b"hello world, again") # it will be automatically closed when the connection is # closed, but explicitly closing chooses transaction # boundaries await blob.aclose() anyio.run(blob) .. index:: Backup (Async example code) .. _example_async_backup: Backup ------ :doc:`Backups ` run in the SQLite worker thread of the async destination database. The source can be a sync or async database. You do backups by getting the backup object from the destination database telling it about the source using :meth:`Connection.backup`, If the destination is sync and you are working with an async source, you can run the backup in the async source thread as demonstrated below. .. code-block:: python async def backup(): # Setup source and destinations async_source = await apsw.Connection.as_async("") # ... and fill it with a large amount of data await async_source.execute( "CREATE TABLE x(y); INSERT INTO x VALUES(randomblob(250000))" ) sync_source = apsw.Connection("") sync_source.execute( "CREATE TABLE x(y); INSERT INTO x VALUES(randomblob(250000))" ) async_dest = await apsw.Connection.as_async("") sync_dest = apsw.Connection("") print("async destination, async source") async with await async_dest.backup( "main", async_source, "main" ) as backup: while not backup.done: await backup.step(42) print( f"page_count = {backup.page_count} remaining = {backup.remaining}" ) print("async destination, sync source") async with await async_dest.backup( "main", sync_source, "main" ) as backup: while not backup.done: await backup.step(42) print( f"page_count = {backup.page_count} remaining = {backup.remaining}" ) print("sync destination, async source") # we will run this in the async source thread def do_backup(): with sync_dest.backup("main", async_source, "main") as backup: while not backup.done: backup.step(42) print( f"page_count = {backup.page_count} remaining = {backup.remaining}" ) await async_source.async_run(do_backup) # ensure connections get closed await async_source.aclose() await async_dest.aclose() asyncio.run(backup()) .. code-block:: output async destination, async source page_count = 63 remaining = 21 page_count = 63 remaining = 0 async destination, sync source page_count = 63 remaining = 21 page_count = 63 remaining = 0 sync destination, async source page_count = 63 remaining = 21 page_count = 63 remaining = 0 .. index:: Full Text Search (Async example code) .. _example_async_fts: Full Text Search ---------------- :class:`~apsw.fts5.Table` accesses the database for virtually all methods and attributes, so using the :ref:`worker thread ` is needed. A subset of the :doc:`example-fts` is shown. .. code-block:: python async def fts(): db = await apsw.Connection.as_async("recipes.db") # always close database async with contextlib.aclosing(db): if not await db.table_exists("main", "search"): search_table: apsw.fts5.Table = await db.async_run( apsw.fts5.Table.create, db, "search", content="recipes", columns=None, generate_triggers=True, tokenize=[ "simplify", "casefold", "true", "strip", "true", "strip", "true", "unicodewords", ], ) else: search_table: apsw.fts5.Table = await db.async_run( apsw.fts5.Table, db, "search" ) # property access print( "row_count =", await db.async_run(getattr, search_table, "row_count"), ) # we need to do search processing in the worker thread def search_processing(query: str, limit: int): matches = [] for match in search_table.search(query): matches.append(match) if len(matches) >= limit: break return matches for match in await db.async_run( search_processing, "lemon OR guava", 10 ): pprint(match) break print( "First match name is", await db.async_run( search_table.row_by_id, match.rowid, "name" ), ) # query suggestion query = "nyme:(minced OR oyl NOT peenut)" print( query, "=>", await db.async_run(search_table.query_suggest, query), ) asyncio.run(fts()) .. code-block:: output row_count = 173278 MatchInfo(query_info=QueryInfo(phrases=(('lemon',), ('guava',))), rowid=1642796043941632884, column_size=[3, 24, 5], phrase_columns=[[1], [0, 2]]) First match name is P.L's Guava Jelly nyme:(minced OR oyl NOT peenut) => name: (minced OR oil NOT peanut) .. index:: Session (Async example code) .. _example_async_session: Session ------- Use :func:`apsw.aio.make_session` to create the :class:`~apsw.Session` object in async mode from an async connection. .. code-block:: python async def session_example(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.execute("CREATE TABLE x(y PRIMARY KEY, z)") session = await apsw.aio.make_session(db, "main") # We'd like size estimates session.config(apsw.SQLITE_SESSION_OBJCONFIG_SIZE, True) # all tables await session.attach() # add some data await db.executemany( "INSERT INTO x VALUES(?,?)", ((i, "a" * i) for i in range(200)), ) print("Size estimate {session.changeset_size}") changeset = await session.changeset() print(f"Actual size {len(changeset)}") # Other than apply, changeset operations don't use a # Connection so we'll use trio's mechanism to do invert in a # background thread. undo = await trio.to_thread.run_sync( apsw.Changeset.invert, changeset ) # Undo the changes await apsw.Changeset.apply(undo, db) trio.run(session_example)