Example/Tour

This code demonstrates usage of APSW. It gives you a good overview of all the things that can be done. Also included is output so you can see what gets printed when you run the code.

There are also specific examples in the classes, functions, and attribute documentation.

#!/usr/bin/env python3

# This code uses Python's optional typing annotations.  You can
# ignore them and do not need to use them.  If you do use them
# then you must include this future annotations line first.
from __future__ import annotations

from typing import Optional, Iterator, Any

import os
import sys
import time
import apsw
import apsw.ext
import random
import re
from pathlib import Path

Checking APSW and SQLite versions

# Where the extension module is on the filesystem
print("      Using APSW file", apsw.__file__)

# From the extension
print("         APSW version", apsw.apsw_version())

# From the sqlite header file at APSW compile time
print("SQLite header version", apsw.SQLITE_VERSION_NUMBER)

# The SQLite code running
print("   SQLite lib version", apsw.sqlite_lib_version())

# If True then SQLite is incorporated into the extension.
# If False then a shared library is being used, or static linking
print("   Using amalgamation", apsw.using_amalgamation)
      Using APSW file /space/apsw/apsw/__init__.cpython-311-x86_64-linux-gnu.so
         APSW version 3.45.2.0
SQLite header version 3045002
   SQLite lib version 3.45.2
   Using amalgamation True

Best Practice

Ensure SQLite usage prevents common mistakes, and gets best performance via apsw.bestpractice

import apsw.bestpractice

apsw.bestpractice.apply(apsw.bestpractice.recommended)

Logging

It is a good idea to get SQLite’s logs as you will get more information about errors. Best practice also includes this. apsw.ext.log_sqlite() forwards SQLite’s log messages to the logging module.

apsw.ext.log_sqlite()

# You can also write to SQLite's log
apsw.log(apsw.SQLITE_ERROR, "A message from Python")

Opening the database

You open the database by using Connection

# Default will create the database if it doesn't exist
connection = apsw.Connection("dbfile")

# Open existing read-only
connection = apsw.Connection("dbfile", flags=apsw.SQLITE_OPEN_READONLY)

# Open existing read-write (exception if it doesn't exist)
connection = apsw.Connection("dbfile", flags=apsw.SQLITE_OPEN_READWRITE)

Executing SQL

Use Connection.execute() to execute SQL

connection.execute("create table point(x,y,z)")
connection.execute("insert into point values(1, 2, 3)")
# You can use multiple ; separated statements
connection.execute("""
    insert into point values(4, 5, 6);
    create table log(timestamp, event);
    create table foo(a, b, c);
    create table important(secret, data);
""")

# read rows
for row in connection.execute("select * from point"):
    print(row)
(1, 2, 3)
(4, 5, 6)

Why you use bindings to provide values

It is tempting to compose strings with the values in them, but it is easy to mangle the query especially if values contain punctuation and unicode. It is known as SQL injection. Bindings are the correct way to supply values to queries.

# a simple value
event = "system started"
# DO NOT DO THIS
query = f"insert into log values(0, '{ event }')"
print("query:", query)

# BECAUSE ... a bad guy could provide a value like this
event = "bad guy here') ; drop table important; -- comment"
# which has effects like this
query = f"insert into log values(0, '{ event }')"
print("bad guy:", query)
query: insert into log values(0, 'system started')
bad guy: insert into log values(0, 'bad guy here') ; drop table important; -- comment')

Bindings (sequence)

Bindings can be provided as a sequence such as with a tuple or list. Use ? to show where the values go.

query = "insert into log values(?, ?)"
data = (7, "transmission started")
connection.execute(query, data)

# You can also use numbers after the ? to select
# values from the sequence.  Note that numbering
# starts at 1
query = "select ?1, ?3, ?2"
data = ("alpha", "beta", "gamma")
for row in connection.execute(query, data):
    print(row)
('alpha', 'gamma', 'beta')

Bindings (dict)

You can also supply bindings with a dictionary. Use :NAME, @NAME, or $NAME, to provide the key name in the query. Names are case sensitive.

query = "insert into point values(:x, @Y, $z)"
data = {"x": 7, "Y": 8, "z": 9}
connection.execute(query, data)

Transactions

By default each statement is its own transaction. A transaction finishes by flushing data to storage and waiting for the operating system to confirm it is permanently there (ie will survive a power failure) which takes a while.

# 3 separate transactions
connection.execute("insert into point values(2, 2, 2)")
connection.execute("insert into point values(3, 3, 3)")
connection.execute("insert into point values(4, 4, 4)")

# You can use BEGIN / COMMIT to manually make a transaction
connection.execute("BEGIN")
connection.execute("insert into point values(2, 2, 2)")
connection.execute("insert into point values(3, 3, 3)")
connection.execute("insert into point values(4, 4, 4)")
connection.execute("COMMIT")

# Or use `with` that does it automatically
with connection:
    connection.execute("insert into point values(2, 2, 2)")
    connection.execute("insert into point values(3, 3, 3)")
    connection.execute("insert into point values(4, 4, 4)")

# Nested transactions are supported
with connection:
    connection.execute("insert into point values(2, 2, 2)")
    with connection:
        connection.execute("insert into point values(3, 3, 3)")
        connection.execute("insert into point values(4, 4, 4)")

executemany

You can execute the same SQL against a sequence using Connection.executemany()

data = (
    (1, 1, 1),
    (2, 2, 2),
    (3, 3, 3),
    (4, 4, 4),
    (5, 5, 5),
)
query = "insert into point values(?,?,?)"

# we do it in a transaction
with connection:
    # the query is run for each item in data
    connection.executemany(query, data)

Pragmas

SQLite has a wide variety of pragmas to control the database configuration and library behaviour. See the Tips for maintaining your schema.

# WAL mode is good for write performance
connection.pragma("journal_mode", "wal")

# Foreign keys are off by default, so turn them on
connection.pragma("foreign_keys", True)

# You can use this to see if any other connection (including other processes) has
# changed the database
connection.pragma("data_version")

# Useful at startup to detect some database corruption
check = connection.pragma("integrity_check")
if check != "ok":
    print("Integrity check errors", check)

Tracing execution

You can trace execution of SQL statements. See more about tracing.

def my_tracer(cursor: apsw.Cursor, statement: str, bindings: Optional[apsw.Bindings]) -> bool:
    "Called just before executing each statement"
    print("SQL:", statement.strip())
    print("Bindings:", bindings)
    return True  # if you return False then execution is aborted


# you can trace a single cursor
cursor = connection.cursor()
cursor.exec_trace = my_tracer
cursor.execute(
    """
        drop table if exists bar;
        create table bar(x,y,z);
        select * from point where x=?;
        """, (3, ))

# if set on a connection then all cursors are traced
connection.exec_trace = my_tracer
# and clearing it
connection.exec_trace = None
SQL: drop table if exists bar;
Bindings: ()
SQL: create table bar(x,y,z);
Bindings: ()
SQL: select * from point where x=?;
Bindings: (3,)

Tracing returned rows

You can trace returned rows, including modifying what is returned or skipping it completely. See more about tracing.

def row_tracer(cursor: apsw.Cursor, row: apsw.SQLiteValues) -> apsw.SQLiteValues:
    """Called with each row of results before they are handed off.  You can return None to
    cause the row to be skipped or a different set of values to return"""
    print("Row:", row)
    return row


# you can trace a single cursor
cursor = connection.cursor()
cursor.row_trace = row_tracer
for row in cursor.execute("select x,y from point where x>4"):
    pass

# if set on a connection then all cursors are traced
connection.row_trace = row_tracer
# and clearing it
connection.row_trace = None
Row: (7, 8)
Row: (5, 5)

Defining scalar functions

Scalar functions take one or more values and return one value. They are registered by calling Connection.create_scalar_function().

def ilove7(*args: apsw.SQLiteValue) -> int:
    "A scalar function"
    print(f"ilove7 got { args } but I love 7")
    return 7


connection.create_scalar_function("seven", ilove7)

for row in connection.execute("select seven(x,y) from point where x>4"):
    print("row", row)
ilove7 got (7, 8) but I love 7
row (7,)
ilove7 got (5, 5) but I love 7
row (7,)

Defining aggregate functions

Aggregate functions are called multiple times with matching rows, and then provide a final value. An example is calculating an average. They are registered by calling Connection.create_aggregate_function().

class longest:
    # Find which value when represented as a string is
    # the longest

    def __init__(self) -> None:
        self.longest = ""

    def step(self, *args: apsw.SQLiteValue) -> None:
        # Called with each matching row
        for arg in args:
            if len(str(arg)) > len(self.longest):
                self.longest = str(arg)

    def final(self) -> str:
        # Called at the very end
        return self.longest


connection.create_aggregate_function("longest", longest)
print(connection.execute("select longest(event) from log").get)
transmission started

Defining window functions

Window functions input values come from a “window” around a row of interest. Four methods are called as the window moves to add, remove, get the current value, and finalize.

An example is calculating an average of values in the window to compare to the row. They are registered by calling Connection.create_window_function().

This is the Python equivalent to the C based example in the SQLite documentation

class SumInt:

    def __init__(self):
        self.v = 0

    def step(self, arg):
        print("step", arg)
        self.v += arg

    def inverse(self, arg):
        print("inverse", arg)
        self.v -= arg

    def final(self):
        print("final", self.v)
        return self.v

    def value(self):
        print("value", self.v)
        return self.v


connection.create_window_function("sumint", SumInt)

for row in connection.execute("""
        CREATE TABLE t3(x, y);
        INSERT INTO t3 VALUES('a', 4),
                             ('b', 5),
                             ('c', 3),
                             ('d', 8),
                             ('e', 1);
        -- Use the window function
        SELECT x, sumint(y) OVER (
        ORDER BY x ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
        ) AS sum_y
        FROM t3 ORDER BY x;
    """):
    print("ROW", row)
step 4
step 5
value 9
ROW ('a', 9)
step 3
value 12
ROW ('b', 12)
inverse 4
step 8
value 16
ROW ('c', 16)
inverse 5
step 1
value 12
ROW ('d', 12)
inverse 3
value 9
ROW ('e', 9)
final 9

Defining collations (sorting)

How you sort can depend on the languages or values involved. You register a collation by calling Connection.create_collation().

# This example sorting mechanisms understands some text followed by a
# number and ensures the number portion gets sorted correctly

connection.execute("create table names(name)")
connection.executemany("insert into names values(?)", (
    ("file1", ),
    ("file7", ),
    ("file17", ),
    ("file20", ),
    ("file3", ),
))

print("Standard sorting")
for row in connection.execute("select * from names order by name"):
    print(row)


def str_num_collate(s1: apsw.SQLiteValue, s2: apsw.SQLiteValue) -> int:
    # return -1 if s1<s2, +1 if s1>s2 else 0 for equal

    def parts(s: str) -> list:
        "Converts str into list of alternating str and int parts"
        return [int(v) if v.isdigit() else v
                  for v in re.split(r"(\d+)", s)]

    ps1 = parts(str(s1))
    ps2 = parts(str(s2))

    # compare
    if ps1 < ps2:
        return -1
    if ps1 > ps2:
        return 1
    return 0


connection.create_collation("strnum", str_num_collate)

print("\nUsing strnum")
for row in connection.execute("select * from names order by name collate strnum"):
    print(row)
Standard sorting
('file1',)
('file17',)
('file20',)
('file3',)
('file7',)

Using strnum
('file1',)
('file3',)
('file7',)
('file17',)
('file20',)

Accessing results by column name

You can access results by column name using dataclasses. APSW provides apsw.ext.DataClassRowFactory for names.

import apsw.ext

connection.execute("""
    create table books(id, title, author, year);
    insert into books values(7, 'Animal Farm', 'George Orwell', 1945);
    insert into books values(37, 'The Picture of Dorian Gray', 'Oscar Wilde', 1890);
    """)

# Normally you use column numbers
for row in connection.execute("select title, id, year from books where author=?", ("Oscar Wilde", )):
    # this is very fragile
    print("title", row[0])
    print("id", row[1])
    print("year", row[2])

# Turn on dataclasses - frozen makes them read-only
connection.row_trace = apsw.ext.DataClassRowFactory(dataclass_kwargs={"frozen": True})

print("\nNow with dataclasses\n")

# Same query - note using AS to set column name
for row in connection.execute(
        """SELECT title,
           id AS book_id,
           year AS book_year
           FROM books WHERE author = ?""", ("Oscar Wilde", )):
    print("title", row.title)
    print("id", row.book_id)
    print("year", row.book_year)

# clear
connection.row_trace = None
title The Picture of Dorian Gray
id 37
year 1890

Now with dataclasses

title The Picture of Dorian Gray
id 37
year 1890

Type conversion into/out of database

You can use apsw.ext.TypesConverterCursorFactory to do conversion, both for types you define and for other types.

import apsw.ext

registrar = apsw.ext.TypesConverterCursorFactory()
connection.cursor_factory = registrar


# A type we define - deriving from SQLiteTypeAdapter automatically registers conversion
# to a SQLite value
class Point(apsw.ext.SQLiteTypeAdapter):

    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __repr__(self) -> str:
        return f"Point({ self.x }, { self.y })"

    def __eq__(self, other: object) -> bool:
        return isinstance(other, type(self)) and self.x == other.x and self.y == other.y

    def to_sqlite_value(self) -> str:
        # called to convert Point into something SQLite supports
        return f"{ self.x };{ self.y }"

    # This converter will be registered
    @classmethod
    def convert_from_sqlite(cls, value: str) -> Point:
        return cls(*(float(part) for part in value.split(";")))


# An existing type
def complex_to_sqlite_value(c: complex) -> str:
    return f"{ c.real }+{ c.imag }"


# ... requires manual registration
registrar.register_adapter(complex, complex_to_sqlite_value)

# conversion from a SQLite value requires registration
registrar.register_converter("POINT", Point.convert_from_sqlite)


# ... and for complex
def sqlite_to_complex(v: str) -> complex:
    return complex(*(float(part) for part in v.split("+")))


registrar.register_converter("COMPLEX", sqlite_to_complex)

# note that the type names are case sensitive and must match the
# registration
connection.execute("create table conversion(p POINT, c COMPLEX)")

# convert going into database
test_data = (Point(5.2, 7.6), 3 + 4j)
connection.execute("insert into conversion values(?, ?)", test_data)
print("inserted", test_data)

# and coming back out
for row in connection.execute("select * from conversion"):
    print("back out", row)
    print("equal", row == test_data)

# clear registrar
connection.cursor_factory = apsw.Cursor
inserted (Point(5.2, 7.6), (3+4j))
back out (Point(5.2, 7.6), (3+4j))
equal True

Query details

apsw.ext.query_info() can provide a lot of information about a query (without running it)

import apsw.ext

# test tables
connection.execute("""
    create table customers(
        id INTEGER PRIMARY KEY,
        name CHAR,
        address CHAR);
    create table orders(
        id INTEGER PRIMARY KEY,
        customer_id INTEGER,
        item MY_OWN_TYPE);
    create index cust_addr on customers(address);
""")

query = """
    SELECT * FROM orders
    JOIN customers ON orders.customer_id=customers.id
    WHERE address = ?;
    SELECT 7;"""
bindings = ("123 Main Street", )

# ask for all information available
qd = apsw.ext.query_info(
    connection,
    query,
    bindings=bindings,
    actions=True,  # which tables/views etc and how they are accessed
    expanded_sql=True,  # expands bindings into query string
    explain=True,  # shows low level VDBE
    explain_query_plan=True,  # how SQLite solves the query
)

# help with formatting
import pprint

print("query", qd.query)
print("\nbindings", qd.bindings)
print("\nexpanded_sql", qd.expanded_sql)
print("\nfirst_query", qd.first_query)
print("\nquery_remaining", qd.query_remaining)
print("\nis_explain", qd.is_explain)
print("\nis_readonly", qd.is_readonly)
print("\ndescription\n", pprint.pformat(qd.description))
if hasattr(qd, "description_full"):
    print("\ndescription_full\n", pprint.pformat(qd.description_full))

print("\nquery_plan\n", pprint.pformat(qd.query_plan))
print("\nFirst 5 actions\n", pprint.pformat(qd.actions[:5]))
print("\nFirst 5 explain\n", pprint.pformat(qd.explain[:5]))
query
    SELECT * FROM orders
    JOIN customers ON orders.customer_id=customers.id
    WHERE address = ?;
    SELECT 7;

bindings ('123 Main Street',)

expanded_sql
    SELECT * FROM orders
    JOIN customers ON orders.customer_id=customers.id
    WHERE address = '123 Main Street';

first_query
    SELECT * FROM orders
    JOIN customers ON orders.customer_id=customers.id
    WHERE address = ?;


query_remaining SELECT 7;

is_explain 0

is_readonly True

description
 (('id', 'INTEGER'),
 ('customer_id', 'INTEGER'),
 ('item', 'MY_OWN_TYPE'),
 ('id', 'INTEGER'),
 ('name', 'CHAR'),
 ('address', 'CHAR'))

description_full
 (('id', 'INTEGER', 'main', 'orders', 'id'),
 ('customer_id', 'INTEGER', 'main', 'orders', 'customer_id'),
 ('item', 'MY_OWN_TYPE', 'main', 'orders', 'item'),
 ('id', 'INTEGER', 'main', 'customers', 'id'),
 ('name', 'CHAR', 'main', 'customers', 'name'),
 ('address', 'CHAR', 'main', 'customers', 'address'))

query_plan
 QueryPlan(detail='QUERY PLAN',
          sub=[QueryPlan(detail='SCAN orders', sub=None),
               QueryPlan(detail='SEARCH customers USING INTEGER PRIMARY KEY '
                                '(rowid=?)',
                         sub=None)])

First 5 actions
 [QueryAction(action=21,
             action_name='SQLITE_SELECT',
             column_name=None,
             database_name=None,
             file_name=None,
             function_name=None,
             module_name=None,
             operation=None,
             pragma_name=None,
             pragma_value=None,
             table_name=None,
             trigger_name=None,
             trigger_or_view=None,
             view_name=None),
 QueryAction(action=20,
             action_name='SQLITE_READ',
             column_name='id',
             database_name='main',
             file_name=None,
             function_name=None,
             module_name=None,
             operation=None,
             pragma_name=None,
             pragma_value=None,
             table_name='orders',
             trigger_name=None,
             trigger_or_view=None,
             view_name=None),
 QueryAction(action=20,
             action_name='SQLITE_READ',
             column_name='customer_id',
             database_name='main',
             file_name=None,
             function_name=None,
             module_name=None,
             operation=None,
             pragma_name=None,
             pragma_value=None,
             table_name='orders',
             trigger_name=None,
             trigger_or_view=None,
             view_name=None),
 QueryAction(action=20,
             action_name='SQLITE_READ',
             column_name='item',
             database_name='main',
             file_name=None,
             function_name=None,
             module_name=None,
             operation=None,
             pragma_name=None,
             pragma_value=None,
             table_name='orders',
             trigger_name=None,
             trigger_or_view=None,
             view_name=None),
 QueryAction(action=20,
             action_name='SQLITE_READ',
             column_name='id',
             database_name='main',
             file_name=None,
             function_name=None,
             module_name=None,
             operation=None,
             pragma_name=None,
             pragma_value=None,
             table_name='customers',
             trigger_name=None,
             trigger_or_view=None,
             view_name=None)]

First 5 explain
 [VDBEInstruction(addr=0,
                 opcode='Init',
                 comment=None,
                 p1=0,
                 p2=17,
                 p3=0,
                 p4=None,
                 p5=0),
 VDBEInstruction(addr=1,
                 opcode='OpenRead',
                 comment=None,
                 p1=0,
                 p2=12,
                 p3=0,
                 p4='3',
                 p5=0),
 VDBEInstruction(addr=2,
                 opcode='OpenRead',
                 comment=None,
                 p1=1,
                 p2=11,
                 p3=0,
                 p4='3',
                 p5=0),
 VDBEInstruction(addr=3,
                 opcode='Rewind',
                 comment=None,
                 p1=0,
                 p2=16,
                 p3=0,
                 p4=None,
                 p5=0),
 VDBEInstruction(addr=4,
                 opcode='Column',
                 comment=None,
                 p1=0,
                 p2=1,
                 p3=1,
                 p4=None,
                 p5=0)]

Blob I/O

BLOBS (binary large objects) are supported by SQLite. Note that you cannot change the size of one, but you can allocate one filled with zeroes, and then later open it and read / write the contents similar to a file, without having the entire blob in memory. Use Connection.blob_open() to open a blob.

connection.execute("create table blobby(x,y)")
# Add a blob we will fill in later
connection.execute("insert into blobby values(1, zeroblob(10000))")
# Or as a binding
connection.execute("insert into blobby values(2, ?)", (apsw.zeroblob(20000), ))
# Open a blob for writing.  We need to know the rowid
rowid = connection.execute("select ROWID from blobby where x=1").get
blob = connection.blob_open("main", "blobby", "y", rowid, True)
blob.write(b"hello world")
blob.seek(2000)
blob.read(24)
# seek relative to the end
blob.seek(-32, 2)
blob.write(b"hello world, again")
blob.close()

Backup an open database

You can backup a database that is open. The pages are copied in batches of your choosing and allow continued use of the source database.

# We will copy a disk database into this memory database
destination = apsw.Connection(":memory:")

# Copy into destination
with destination.backup("main", connection, "main") as backup:
    # The source database can change while doing the backup
    # and the backup will still pick up those changes
    while not backup.done:
        backup.step(7)  # copy up to 7 pages each time
        # monitor progress
        print(backup.remaining, backup.page_count)
15 22
8 22
1 22
0 22

Authorizer (control what SQL can do)

You can allow, deny, or ignore what SQL does. Use Connection.authorizer to set an authorizer.

def auth(operation: int, p1: Optional[str], p2: Optional[str], db_name: Optional[str],
         trigger_or_view: Optional[str]) -> int:
    """Called when each operation is prepared.  We can return SQLITE_OK, SQLITE_DENY or
    SQLITE_IGNORE"""
    # find the operation name
    print(apsw.mapping_authorizer_function[operation], p1, p2, db_name, trigger_or_view)
    if operation == apsw.SQLITE_CREATE_TABLE and p1 and p1.startswith("private"):
        return apsw.SQLITE_DENY  # not allowed to create tables whose names start with private

    return apsw.SQLITE_OK  # always allow


connection.authorizer = auth
connection.execute("insert into names values('foo')")
connection.execute("select name from names limit 1")
try:
    connection.execute("create table private_stuff(secret)")
    print("Created secret table!")
except Exception as e:
    print(e)

# Clear authorizer
connection.authorizer = None
SQLITE_INSERT names None main None
SQLITE_SELECT None None None None
SQLITE_READ names name main None
SQLITE_INSERT sqlite_master None main None
SQLITE_CREATE_TABLE private_stuff None main None
AuthError: not authorized

Progress handler

Some operations (eg joins, sorting) can take many operations to complete. Register a progress handler callback with Connection.set_progress_handler() which lets you provide feedback and allows cancelling.

# create a table with random numbers
with connection:
    connection.execute("create table numbers(x)")
    connection.executemany("insert into numbers values(?)",
                            ((random.randint(0, 9999999999), )
                                for _ in range(100)))


def progress_handler() -> bool:
    print("progress handler called")
    return False  # returning True aborts


# register handler every 50 vdbe instructions
connection.set_progress_handler(progress_handler, 50)

# Sorting the numbers to find the biggest
for max_num in connection.execute("select max(x) from numbers"):
    print(max_num)

# Clear handler
connection.set_progress_handler(None)
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
(9909188373,)

File Control

We can get/set low level information using the Connection.file_control() interface. In this example we get the data version. There is a pragma but it doesn’t change for commits on the same connection.

# We use ctypes to provide the correct C level data types and pointers
import ctypes


def get_data_version(db):
    # unsigned 32 bit integer
    data_version = ctypes.c_uint32(0)
    ok = db.file_control(
        "main",  # or an attached database name
        apsw.SQLITE_FCNTL_DATA_VERSION,  # code
        ctypes.addressof(data_version))  # pass C level pointer
    assert ok, "SQLITE_FCNTL_DATA_VERSION was not understood!"
    return data_version.value


# Show starting values
print("fcntl", get_data_version(connection),
      "pragma", connection.pragma("data_version"))

# See the fcntl value versus pragma value
for sql in (
        "create table fcntl_example(x)",
        "begin ; insert into fcntl_example values(3)",
        # we can see the version doesn't change inside a transaction
        "insert into fcntl_example values(4)",
        "commit",
        "pragma user_version=1234",
):
    print(sql)
    connection.execute(sql)
    print("fcntl", get_data_version(connection),
          "pragma", connection.pragma("data_version"))
fcntl 40 pragma 2
create table fcntl_example(x)
fcntl 41 pragma 2
begin ; insert into fcntl_example values(3)
fcntl 41 pragma 2
insert into fcntl_example values(4)
fcntl 41 pragma 2
commit
fcntl 42 pragma 2
pragma user_version=1234
fcntl 43 pragma 2

Commit hook

A commit hook can allow or veto commits. Register a commit hook with Connection.set_commit_hook().

def my_commit_hook() -> bool:
    print("in commit hook")
    hour = time.localtime()[3]
    if hour >= 8 and hour < 18:
        print("commits okay at this time")
        return False  # let commit go ahead
    print("no commits out of hours")
    return True  # abort commits outside of 8am through 6pm


connection.set_commit_hook(my_commit_hook)
try:
    with connection:
        connection.execute("""create table example(x,y,z);
                           insert into example values (3,4,5)""")
except apsw.ConstraintError:
    print("commit was not allowed")

connection.set_commit_hook(None)
in commit hook
commits okay at this time

Update hook

Update hooks let you know that data has been added, changed, or removed. For example you could use this to discard cached information. Register a hook using Connection.set_update_hook().

def my_update_hook(type: int, db_name: str, table_name: str, rowid: int) -> None:
    op: str = apsw.mapping_authorizer_function[type]
    print(f"Updated: { op } db { db_name }, table { table_name }, rowid { rowid }")


connection.set_update_hook(my_update_hook)
connection.execute("insert into names values(?)", ("file93", ))
connection.execute("update names set name=? where name=?", ("file94", "file93"))
connection.execute("delete from names where name=?", ("file94", ))

# Clear the hook
connection.set_update_hook(None)
Updated: SQLITE_INSERT db main, table names, rowid 7
Updated: SQLITE_UPDATE db main, table names, rowid 7
Updated: SQLITE_DELETE db main, table names, rowid 7

Virtual tables

Virtual tables let you provide data on demand as a SQLite table so you can use SQL queries against that data. Writing your own virtual table requires understanding how to return less than all the data via the BestIndex method.

You can export a Python function as a virtual table in 3 lines of code using apsw.ext.make_virtual_module(), being able to provide both positional and keyword arguments.

For the first example you’ll find apsw.ext.generate_series() useful instead.

# Yield a row at a time
def table_range(start=1, stop=100, step=1):
    for i in range(start, stop + 1, step):
        yield (i, )

# set column names
table_range.columns = ("value", )
# set how to access what table_range returns
table_range.column_access = apsw.ext.VTColumnAccess.By_Index

# register it
apsw.ext.make_virtual_module(connection, "range", table_range)

# see it work.  we can provide both positional and keyword
# arguments
query = "SELECT * FROM range(90) WHERE step=2"
print(apsw.ext.format_query_table(connection, query))
┌───────┐
│ value │
│ 90    │
│ 92    │
│ 94    │
│ 96    │
│ 98    │
│ 100   │
└───────┘
# the parameters are hidden columns so '*' doesn't select them
# but you can ask
query = "SELECT *, start, stop, step FROM range(89) WHERE step=3"
print(apsw.ext.format_query_table(connection, query))
┌───────┬───────┬──────┬──────┐
│ value │ start │ stop │ step │
│ 89    │ 89    │ 100  │ 3    │
│ 92    │ 89    │ 100  │ 3    │
│ 95    │ 89    │ 100  │ 3    │
│ 98    │ 89    │ 100  │ 3    │
└───────┴───────┴──────┴──────┘
# Expose the unicode database.
import unicodedata

# A more complex example exporting unicodedata module

# The methods we will call on each codepoint
unicode_methods = ("name", "decimal", "digit", "numeric", "category", "combining",
                   "bidirectional", "east_asian_width", "mirrored", "decomposition")

# the function we will turn into a virtual table returning
# each row as a dict
def unicode_data(start=0, stop=sys.maxunicode):

    # some methods raise ValueError on some codepoints
    def call(meth: str, c: str):
        try:
            return getattr(unicodedata, meth)(c)
        except ValueError:
            return None

    for c in range(start, stop + 1):
        yield {k: call(k, chr(c)) for k in unicode_methods}


# setup column names and access
unicode_data.columns = unicode_methods
unicode_data.column_access = apsw.ext.VTColumnAccess.By_Name

# register
apsw.ext.make_virtual_module(connection, "unicode_data", unicode_data)

# how many codepoints are in each category?
query = """
    SELECT count(*), category FROM unicode_data
       WHERE stop = 0xffff  -- BMP only
       GROUP BY category
       ORDER BY category
       LIMIT 10"""
print(apsw.ext.format_query_table(connection, query))
┌──────────┬──────────┐
│ count(*) │ category │
│ 65       │ Cc       │
│ 43       │ Cf       │
│ 1456     │ Cn       │
│ 6400     │ Co       │
│ 2048     │ Cs       │
│ 1445     │ Ll       │
│ 236      │ Lm       │
│ 46126    │ Lo       │
│ 31       │ Lt       │
│ 1127     │ Lu       │
└──────────┴──────────┘
# A more complex example - given a list of directories return information
# about the files within them recursively
def get_files_info(directories: str,
                   sep: str = os.pathsep,
                   *,
                   ignore_symlinks: bool = True) -> Iterator[dict[str, Any]]:
    for root in directories.split(sep):
        with os.scandir(root) as sd:
            for entry in sd:
                if entry.is_symlink() and ignore_symlinks:
                    continue
                if entry.is_dir():
                    yield from get_files_info(os.path.join(root, entry.name), ignore_symlinks=ignore_symlinks)
                elif entry.is_file():
                    s = entry.stat()
                    yield {
                        "directory": root,
                        "name": entry.name,
                        "extension": os.path.splitext(entry.name)[1],
                        **{
                            k: getattr(s, k)
                            for k in get_files_info.stat_columns
                        }
                    }


# which stat columns do we want?
get_files_info.stat_columns = tuple(n for n in dir(os.stat(".")) if n.startswith("st_"))
# setup columns and access by providing an example of the first entry returned
get_files_info.columns, get_files_info.column_access = \
    apsw.ext.get_column_names(next(get_files_info(".")))

apsw.ext.make_virtual_module(connection, "files_info", get_files_info)

# all the sys.path directories
bindings = (os.pathsep.join(p for p in sys.path if os.path.isdir(p)
            #  except our current one
            and not os.path.samefile(p, ".")), )

# Find the 3 biggest files that aren't libraries
query = """SELECT st_size, directory, name
            FROM files_info(?)
            WHERE extension NOT IN ('.a', '.so')
            ORDER BY st_size DESC
            LIMIT 3"""
print(apsw.ext.format_query_table(connection, query, bindings))
┌─────────┬────────────────────────────────────────────────┬───────────────────┐
│ st_size │                   directory                    │       name        │
├─────────┼────────────────────────────────────────────────┼───────────────────┤
│ 1645145 │ /space/apsw/tools/.mypy_cache/3.11             │ builtins.data.jso │
│         │                                                │ n                 │
├─────────┼────────────────────────────────────────────────┼───────────────────┤
│ 757129  │ /usr/lib/python3.11/pydoc_data                 │ topics.py         │
├─────────┼────────────────────────────────────────────────┼───────────────────┤
│ 717967  │ /space/apsw/tools/.mypy_cache/3.11/collections │ __init__.data.jso │
│         │                                                │ n                 │
└─────────┴────────────────────────────────────────────────┴───────────────────┘
# Find the 3 oldest Python files
query = """SELECT DATE(st_ctime, 'auto') AS date, directory, name
            FROM files_info(?)
            WHERE extension='.py'
            ORDER BY st_size DESC
            LIMIT 3"""
print(apsw.ext.format_query_table(connection, query, bindings))
┌────────────┬────────────────────────────────┬───────────────┐
│    date    │           directory            │     name      │
│ 2023-10-10 │ /usr/lib/python3.11/pydoc_data │ topics.py     │
│ 2023-10-10 │ /usr/lib/python3.11            │ _pydecimal.py │
│ 2023-10-06 │ /usr/lib/python3.11/tkinter    │ __init__.py   │
└────────────┴────────────────────────────────┴───────────────┘
# find space used by filename extension
query = """SELECT extension, SUM(st_size) as total_size
            FROM files_info(?)
            GROUP BY extension
            ORDER BY extension"""
print(apsw.ext.format_query_table(connection, query, bindings))
┌────────────┬────────────┐
│ extension  │ total_size │
│            │ 384164     │
│ .TAG       │ 190        │
│ .a         │ 113404210  │
│ .allowlist │ 56         │
│ .bootstrap │ 1756       │
│ .c         │ 11116      │
│ .cfg       │ 341        │
│ .csh       │ 935        │
│ .css       │ 1325       │
│ .fish      │ 2215       │
│ .in        │ 3504       │
│ .json      │ 7834037    │
│ .local     │ 1534       │
│ .o         │ 20696      │
│ .ps1       │ 9033       │
│ .py        │ 11816118   │
│ .pyc       │ 13288659   │
│ .rst       │ 9561       │
│ .sh        │ 3941       │
│ .so        │ 27575368   │
│ .stdlib    │ 10752      │
│ .supp      │ 70         │
│ .txt       │ 23425      │
└────────────┴────────────┘
# unregister a virtual table by passing None
connection.create_module("files_info", None)

VFS - Virtual File System

VFS lets you control how SQLite accesses storage. APSW makes it easy to “inherit” from an existing VFS and monitor or alter data as it flows through.

URI are shown as a way to receive parameters when opening/creating a database file, and pragmas for receiving parameters once a database is open.

# This example VFS obfuscates the database file contents by xor all
# bytes with 0xa5.

def obfuscate(data: bytes):
    return bytes([x ^ 0xa5 for x in data])


# Inheriting from a base of "" means the default vfs
class ObfuscatedVFS(apsw.VFS):

    def __init__(self, vfsname="obfuscated", basevfs=""):
        self.vfs_name = vfsname
        self.base_vfs = basevfs
        super().__init__(self.vfs_name, self.base_vfs)

    # We want to return our own file implementation, but also
    # want it to inherit
    def xOpen(self, name, flags):
        in_flags = []
        for k, v in apsw.mapping_open_flags.items():
            if isinstance(k, int) and flags[0] & k:
                in_flags.append(v)
        print("xOpen flags", " | ".join(in_flags))

        if isinstance(name, apsw.URIFilename):
            print("   uri filename", name.filename())
            # We can look at uri parameters
            print("   fast is", name.uri_parameter("fast"))
            print("   level is", name.uri_int("level", 3))
            print("   warp is", name.uri_boolean("warp", False))
            print("   notpresent is", name.uri_parameter("notpresent"))
            # all of them
            print("   all uris", name.parameters)
        else:
            print("   filename", name)
        return ObfuscatedVFSFile(self.base_vfs, name, flags)


# The file implementation where we override xRead and xWrite to call our
# encryption routine
class ObfuscatedVFSFile(apsw.VFSFile):

    def __init__(self, inheritfromvfsname, filename, flags):
        super().__init__(inheritfromvfsname, filename, flags)

    def xRead(self, amount, offset):
        return obfuscate(super().xRead(amount, offset))

    def xWrite(self, data, offset):
        super().xWrite(obfuscate(data), offset)

    def xFileControl(self, op: int, ptr: int) -> bool:
        if op != apsw.SQLITE_FCNTL_PRAGMA:
            return super().xFileControl(op, ptr)
        # implement our own pragma
        p = apsw.VFSFcntlPragma(ptr)
        print(f"pragma received { p.name } = { p.value }")
        # what do we understand?
        if p.name == "my_custom_pragma":
            p.result = "orange"
            return True
        # We did not understand
        return False

# To register the VFS we just instantiate it
obfuvfs = ObfuscatedVFS()

# Lets see what vfs are now available?
print("VFS available", apsw.vfs_names())

# Make an obfuscated db, passing in some URI parameters
# default open flags
open_flags = apsw.SQLITE_OPEN_READWRITE | apsw.SQLITE_OPEN_CREATE
# add in using URI parameters
open_flags |= apsw.SQLITE_OPEN_URI

# uri parameters are after the ? separated by &
obfudb = apsw.Connection("file:myobfudb?fast=speed&level=7&warp=on&another=true",
                         flags=open_flags,
                         vfs=obfuvfs.vfs_name)

# Check it works
obfudb.execute("create table foo(x,y); insert into foo values(1,2)")

# Check it really is obfuscated on disk
print("What is on disk", repr(Path("myobfudb").read_bytes()[:20]))

# And unobfuscating it
print("Unobfuscated disk", repr(obfuscate(Path("myobfudb").read_bytes()[:20])))

# Custom pragma
print("pragma returned", obfudb.pragma("my_custom_pragma", "my value"))

# Tidy up
obfudb.close()
os.remove("myobfudb")
VFS available ['unix', 'obfuscated', 'memdb', 'unix-excl', 'unix-dotfile', 'unix-none']
xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_MAIN_DB | SQLITE_OPEN_READWRITE | SQLITE_OPEN_URI
   uri filename /space/apsw/myobfudb
   fast is speed
   level is 7
   warp is True
   notpresent is None
   all uris ('fast', 'level', 'warp', 'another')
pragma received journal_mode = wal
xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_MAIN_JOURNAL | SQLITE_OPEN_READWRITE
   filename /space/apsw/myobfudb-journal
pragma received foreign_keys = ON
xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | SQLITE_OPEN_WAL
   filename /space/apsw/myobfudb-wal
pragma received integrity_check = foo
What is on disk b'\xf6\xf4\xe9\xcc\xd1\xc0\x85\xc3\xca\xd7\xc8\xc4\xd1\x85\x96\xa5\xb5\xa5\xa7\xa7'
Unobfuscated disk b'SQLite format 3\x00\x10\x00\x02\x02'
pragma received my_custom_pragma = my value
pragma returned orange

Limits

SQLite lets you see and update various limits via Connection.limit()

# Print some limits
for limit in ("LENGTH", "COLUMN", "ATTACHED"):
    name = "SQLITE_LIMIT_" + limit
    max_name = "SQLITE_MAX_" + limit  # compile time limit
    orig = connection.limit(getattr(apsw, name))
    print(name, orig)
    # To get the maximum, set to 0x7fffffff and then read value back
    connection.limit(getattr(apsw, name), 0x7fffffff)
    max = connection.limit(getattr(apsw, name))
    print(max_name, " ", max)

# Set limit for size of a string
connection.execute("create table testlimit(s)")
connection.execute("insert into testlimit values(?)", ("x" * 1024, ))  # 1024 char string
connection.limit(apsw.SQLITE_LIMIT_LENGTH, 1023)  # limit is now 1023
try:
    connection.execute("insert into testlimit values(?)", ("y" * 1024, ))
    print("string exceeding limit was inserted")
except apsw.TooBigError:
    print("Caught toobig exception")

# reset back to largest value
connection.limit(apsw.SQLITE_LIMIT_LENGTH, 0x7fffffff)
SQLITE_LIMIT_LENGTH 1000000000
SQLITE_MAX_LENGTH   1000000000
SQLITE_LIMIT_COLUMN 2000
SQLITE_MAX_COLUMN   2000
SQLITE_LIMIT_ATTACHED 125
SQLITE_MAX_ATTACHED   125
Caught toobig exception

Shell

APSW includes a shell like the one in SQLite, and is also extensible from Python.

import apsw.shell

# Here we use the shell to do a csv export and then dump part of the
# database

# Export to a StringIO
import io

output = io.StringIO()
shell = apsw.shell.Shell(stdout=output, db=connection)

# How to execute a dot command
shell.process_command(".mode csv")
shell.process_command(".headers on")

# How to execute SQL
shell.process_sql("""
    create table csvtest(column1, column2 INTEGER);
    create index faster on csvtest(column1);
    insert into csvtest values(3, 4);
    insert into csvtest values('a b', NULL);
""")

# Or let the shell figure out SQL vs dot command
shell.process_complete_line("select * from csvtest")

# see the result
print(output.getvalue())

# reset output
output.seek(0)

# make a dump of the same table
shell.process_command(".dump csvtest%")

# see the result
print("\nDump output\n")
print(output.getvalue())
column1,column2
3,4
a b,


Dump output

-- SQLite dump (by APSW 3.45.2.0)
-- SQLite version 3.45.2
-- Date: Tue Mar 12 10:29:12 2024
-- Tables like: csvtest%
-- Database: /space/apsw/dbfile
-- User: rogerb @ clamps

-- The values of various per-database settings
PRAGMA page_size=4096;
-- PRAGMA encoding='UTF-8';
-- PRAGMA auto_vacuum=NONE;
-- PRAGMA max_page_count=4294967294;

BEGIN TRANSACTION;

-- Table  csvtest
DROP TABLE IF EXISTS csvtest;
CREATE TABLE csvtest(column1, column2 INTEGER);
INSERT INTO csvtest VALUES(3,4);
INSERT INTO csvtest VALUES('a b',NULL);
-- Triggers and indices on  csvtest
CREATE INDEX faster on csvtest(column1);

-- Your database may need this.  It is sometimes used to keep track of the
-- schema version.
pragma user_version=1234;
COMMIT TRANSACTION;

Statistics

SQLite provides statistics by status(). Use Connection.status() for per connection statistics.

current_usage, max_usage = apsw.status(apsw.SQLITE_STATUS_MEMORY_USED)
print(f"SQLite memory usage { current_usage } max { max_usage }")
schema_used, _ = connection.status(apsw.SQLITE_DBSTATUS_SCHEMA_USED)
print(f"{ schema_used } bytes used to store schema for this connection")
SQLite memory usage 520456 max 2497232
4936 bytes used to store schema for this connection

Tracing

This shows using Connection.trace_v2()

# From https://www.sqlite.org/lang_with.html
# Outlandish Recursive Query Examples

query = """WITH RECURSIVE
            xaxis(x) AS (VALUES(-2.0) UNION ALL SELECT x+0.05 FROM xaxis WHERE x<1.2),
            yaxis(y) AS (VALUES(-1.0) UNION ALL SELECT y+0.1 FROM yaxis WHERE y<1.0),
            m(iter, cx, cy, x, y) AS (
                SELECT 0, x, y, 0.0, 0.0 FROM xaxis, yaxis
                UNION ALL
                SELECT iter+1, cx, cy, x*x-y*y + cx, 2.0*x*y + cy FROM m
                WHERE (x*x + y*y) < 4.0 AND iter<28
            ),
            m2(iter, cx, cy) AS (
                SELECT max(iter), cx, cy FROM m GROUP BY cx, cy
            ),
            a(t) AS (
                SELECT group_concat( substr(' .+*#', 1+min(iter/7,4), 1), '')
                FROM m2 GROUP BY cy
            )
            SELECT group_concat(rtrim(t),x'0a') FROM a;"""


def trace_hook(trace: dict) -> None:
    # check the sql and connection are as expected and remove from trace
    # so we don't print them
    assert trace.pop("sql") == query and trace.pop("connection") is connection
    print("code is ", apsw.mapping_trace_codes[trace["code"]])
    print(pprint.pformat(trace), "\n")


connection.trace_v2(apsw.SQLITE_TRACE_STMT
                     | apsw.SQLITE_TRACE_PROFILE
                     | apsw.SQLITE_TRACE_ROW,
                    trace_hook)

# We will get one each of the trace events
for _ in connection.execute(query):
    pass

# Turn off tracing
connection.trace_v2(0, None)
code is  SQLITE_TRACE_STMT
{'code': 1}

code is  SQLITE_TRACE_ROW
{'code': 4}

code is  SQLITE_TRACE_PROFILE
{'code': 2,
 'nanoseconds': 19000000,
 'stmt_status': {'SQLITE_STMTSTATUS_AUTOINDEX': 0,
                 'SQLITE_STMTSTATUS_FILTER_HIT': 0,
                 'SQLITE_STMTSTATUS_FILTER_MISS': 0,
                 'SQLITE_STMTSTATUS_FULLSCAN_STEP': 1365,
                 'SQLITE_STMTSTATUS_MEMUSED': 15784,
                 'SQLITE_STMTSTATUS_REPREPARE': 0,
                 'SQLITE_STMTSTATUS_RUN': 1,
                 'SQLITE_STMTSTATUS_SORT': 2,
                 'SQLITE_STMTSTATUS_VM_STEP': 1015351}}

Formatting query results table

apsw.ext.format_query_table() makes it easy to format the results of a query in an automatic adjusting table, colour, sanitizing strings, truncation etc.

# Create a table with some dummy data
connection.execute(
    """CREATE TABLE dummy(quantity, [spaces in name], last);
    INSERT INTO dummy VALUES(3, 'some regular text to make this row interesting', x'030709');
    INSERT INTO dummy VALUES(3.14, 'Tiếng Việt', null);
    INSERT INTO dummy VALUES('', ?, ' ');
""", ('special \t\n\f\0 cha\\rs', ))

query = "SELECT * FROM dummy"
# default
print(apsw.ext.format_query_table(connection, query))
┌───────┬─────────────────────────────────────────────────────────────┬────────┐
│ quant │                       spaces in name                        │  last  │
│  ity  │                                                             │        │
├───────┼─────────────────────────────────────────────────────────────┼────────┤
│ 3     │ some regular text to make this row interesting              │ [ 3    │
│       │                                                             │ bytes  │
│       │                                                             │ ]      │
├───────┼─────────────────────────────────────────────────────────────┼────────┤
│ 3.14  │ Ti{LATIN SMALL LETTER E WITH CIRCUMFLEX AND ACUTE}ng        │ (null) │
│       │ Vi{LATIN SMALL LETTER E WITH CIRCUMFLEX AND DOT BELOW}t     │        │
├───────┼─────────────────────────────────────────────────────────────┼────────┤
│       │ special                                                     │        │
│       │ \0 cha\\rs                                                  │        │
└───────┴─────────────────────────────────────────────────────────────┴────────┘
# no unicode boxes and maximum sanitize the text
kwargs = {"use_unicode": False, "string_sanitize": 2}
print(apsw.ext.format_query_table(connection, query, **kwargs))
+----------+------------------------------------------------+-------------+
| quantity |                 spaces in name                 |    last     |
+----------+------------------------------------------------+-------------+
| 3        | some.regular.text.to.make.this.row.interesting | [ 3 bytes ] |
+----------+------------------------------------------------+-------------+
| 3.14     | Ti.ng.Vi.t                                     | (null)      |
+----------+------------------------------------------------+-------------+
|          | special..                                      | .           |
|          | ...cha\rs                                      |             |
+----------+------------------------------------------------+-------------+
# lets have unicode boxes and make things narrow with no word wrap
kwargs = {"use_unicode": True, "string_sanitize": 0, "text_width": 30, "word_wrap": False}
print(apsw.ext.format_query_table(connection, query, **kwargs))
┌─────┬────────────────┬─────┐
│ qua │ spaces in name │ las │
│ nti │                │  t  │
│ ty  │                │     │
├─────┼────────────────┼─────┤
│ 3   │ some regular t │ [ 3 │
│     │ ext to make th │  by │
│     │ is row interes │ tes │
│     │ ting           │  ]  │
├─────┼────────────────┼─────┤
│ 3.1 │ Tiếng Việt     │ (nu │
│ 4   │                │ ll) │
├─────┼────────────────┼─────┤
│     │ special        │     │
│     │ \0 cha\\rs     │     │
└─────┴────────────────┴─────┘
# have the values in SQL syntax
kwargs = {"quote": True}
print(apsw.ext.format_query_table(connection, query, **kwargs))
┌───────┬──────────────────────────────────────────────────────────────┬───────┐
│ quant │                        spaces in name                        │ last  │
│  ity  │                                                              │       │
├───────┼──────────────────────────────────────────────────────────────┼───────┤
│ 3     │ 'some regular text to make this row interesting'             │ X'030 │
│       │                                                              │ 709'  │
├───────┼──────────────────────────────────────────────────────────────┼───────┤
│ 3.14  │ 'Ti{LATIN SMALL LETTER E WITH CIRCUMFLEX AND ACUTE}ng        │ NULL  │
│       │ Vi{LATIN SMALL LETTER E WITH CIRCUMFLEX AND DOT BELOW}t'     │       │
├───────┼──────────────────────────────────────────────────────────────┼───────┤
│ ''    │ 'special                                                     │ ' '   │
│       │ \0 cha\\rs'                                                  │       │
└───────┴──────────────────────────────────────────────────────────────┴───────┘

Cleanup

As a general rule you do not need to do any cleanup. Standard Python garbage collection will take of everything. Even if the process crashes with a connection in the middle of a transaction, the next time SQLite opens that database it will automatically rollback the incomplete transaction.

# You can close connections manually
connection.close()