Session Example/Tour

This example shows using APSW to create and use the Session extension

#!/usr/bin/env python3

# This code uses Python's optional typing annotations.  You can
# ignore them and do not need to use them.  If you do use them
# then you must include this future annotations line first.
from __future__ import annotations

import functools
import pathlib
import tempfile

import apsw
import apsw.ext

Is Session available?

Session must be enabled in SQLite at compile time, and in APSW at its compile time. (PyPI builds always have both enabled)

print("Session in SQLite:", "ENABLE_SESSION" in apsw.compile_options)

print("  Session in APSW:", hasattr(apsw, "Session"))
Session in SQLite: True
  Session in APSW: True

Initial database setup

The database is populated with an items table, a tags table, and a link table allowing multiple tags per item. It has foreign keys and a trigger which cause changes beyond the supplied SQL so we can see how session handles that. See the full SQL

connection = apsw.Connection("")

connection.execute(pathlib.Path("session.sql").read_text())

Monitoring changes

You can have multiple Session monitoring changes on a session. You need to call Session.attach() to say which tables to monitor, or use Session.table_filter() to get a callback. You can pause and resume monitoring with Session.enabled. For unmonitored changes you can use Session.diff() to work out the changes between two tables.

session = apsw.Session(connection, "main")

# enabled by default.  You can set it to False while making
# changes you do not want recorded. It only stops recording
# changes to rows not already part of the changeset.
print(f"{session.enabled=}")

# We'd like size estimates
session.config(apsw.SQLITE_SESSION_OBJCONFIG_SIZE, True)


# we now say which tables to monitor - no tables are monitored by default.
# The tables must have PRIMARY KEY in their declaration otherwise
# nothing is recorded.
def table_filter(name: str) -> bool:
    print(f"table_filter {name=}")
    # We want them all
    return True


# We could also have done session.attach() to get all tables
# or attach with named tables of interest.
session.table_filter(table_filter)

# And now make some changes.  We do every kind of change here -
# INSERT, UPDATE, and DELETE.
connection.execute("""
INSERT INTO items(name) VALUES('kitchen walls');
INSERT INTO item_tag_link(item_id, tag_id)  VALUES(
    (SELECT id FROM items WHERE name='kitchen walls'),
    (SELECT id FROM tags WHERE label='paint')
);
INSERT INTO item_tag_link(item_id, tag_id)  VALUES(
    (SELECT id FROM items WHERE name='kitchen walls'),
    (SELECT id FROM tags WHERE label='cleaning')
);

INSERT INTO items(name) VALUES('microwave');

UPDATE items SET description='high gloss' WHERE name='bathroom ceiling';
UPDATE tags SET cost_centre=null WHERE label='new';

DELETE FROM tags WHERE label='battery';
""")

# How much memory is the session using?
print(f"{session.memory_used=}")
session.enabled=True
table_filter name='items'
table_filter name='item_tag_link'
table_filter name='tags'
session.memory_used=7352

SQL equivalent of a changeset

We can iterate the contents of a changeset as SQL statements using apsw.ext.changeset_to_sql(). It needs to know the column names because changesets only use column numbers, so we use apsw.ext.find_columns() giving it the connection to inspect.

def show_changeset(
    title: str,
    contents: apsw.SessionStreamInput,
    connection: apsw.Connection = connection,
):
    print(title)
    for statement in apsw.ext.changeset_to_sql(
        contents,
        get_columns=functools.partial(
            apsw.ext.find_columns, connection=connection
        ),
    ):
        print(statement)
    print()

Patchsets and Changesets

Changesets contain all the before and after values for changed rows, while patchsets only contain the necessary values to make the change. apsw.ext.changeset_to_sql() is useful to see what SQL a change or patch set is equivalent to.

patchset = session.patchset()
print(f"{len(patchset)=}")

show_changeset("patchset", patchset)

# Note how the changeset is larger and contains more information
changeset = session.changeset()
print(f"{len(changeset)=}")

show_changeset("changeset", changeset)
len(patchset)=241
patchset
UPDATE items SET description='high gloss' WHERE id = 'F9B19B';
INSERT INTO items(id, name, description) VALUES ('3E3865', 'kitchen walls', NULL);
INSERT INTO items(id, name, description) VALUES ('B9B16B', 'microwave', NULL);
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '4579B7' AND tag_id = '00BF23';
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', '3E8EC2', NULL);
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'A0339B', NULL);
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'BF1D0D', 'system');
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('B9B16B', 'BF1D0D', 'system');
UPDATE tags SET cost_centre=NULL WHERE id = 'BF1D0D';
DELETE FROM tags WHERE id = '00BF23';

len(changeset)=274
changeset
UPDATE items SET description='high gloss' WHERE id = 'F9B19B' AND description IS NULL;
INSERT INTO items(id, name, description) VALUES ('3E3865', 'kitchen walls', NULL);
INSERT INTO items(id, name, description) VALUES ('B9B16B', 'microwave', NULL);
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '4579B7' AND tag_id = '00BF23' AND reason IS NULL;
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', '3E8EC2', NULL);
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'A0339B', NULL);
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'BF1D0D', 'system');
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('B9B16B', 'BF1D0D', 'system');
UPDATE tags SET cost_centre=NULL WHERE id = 'BF1D0D' AND cost_centre = 100;
DELETE FROM tags WHERE id = '00BF23' AND label = 'battery' AND cost_centre = 300;

Inverting - undo, redo

We can get the opposite of a changeset which can then form the basis of an undo/redo implementation. One pattern is to have a table where you store changesets allowing for a later undo or redo.

# Yes, it is this easy
undo = apsw.Changeset.invert(changeset)

# Compare this to the changeset above, to see how it does the
# opposite.
show_changeset("undo", undo)
undo
UPDATE items SET description=NULL WHERE id = 'F9B19B' AND description = 'high gloss';
DELETE FROM items WHERE id = '3E3865' AND name = 'kitchen walls' AND description IS NULL;
DELETE FROM items WHERE id = 'B9B16B' AND name = 'microwave' AND description IS NULL;
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('4579B7', '00BF23', NULL);
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = '3E8EC2' AND reason IS NULL;
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'A0339B' AND reason IS NULL;
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'BF1D0D' AND reason = 'system';
/* indirect */ DELETE FROM item_tag_link WHERE item_id = 'B9B16B' AND tag_id = 'BF1D0D' AND reason = 'system';
UPDATE tags SET cost_centre=100 WHERE id = 'BF1D0D' AND cost_centre IS NULL;
INSERT INTO tags(id, label, cost_centre) VALUES ('00BF23', 'battery', 300);

Applying changesets

We can filter which tables get affected when applying a changeset (default all) and can define a conflict handler (default abort the transaction). Conflicts are described here. We are going to undo our earlier changes.

# However it is going to fail ...
try:
    apsw.Changeset.apply(undo, connection)
except apsw.AbortError as exc:
    # It will fail, and the database back in the state when we started
    # the apply.
    print(f"Exception {exc=}")

# The reason it failed is because of the foreign keys automatically
# removing rows in the link table when members of items and tags got
# removed.  Lets do it again, but save the failed changes for
# inspection.

failed = apsw.ChangesetBuilder()

# And make some deliberate conflicts
connection.execute("""
    UPDATE items SET description = 'Orange flavour' WHERE name = 'bathroom ceiling';
    -- Refuse deletes to make constraint failure on delete
    CREATE TRIGGER prevent_microwave_deletion
    BEFORE DELETE ON items
        FOR EACH ROW
        WHEN OLD.name = 'microwave'
        BEGIN
            SELECT RAISE(ABORT, 'Cannot delete items with name "microwave"');
        END;
""")


# A conflict handler says what to do
def conflict_handler(reason: int, change: apsw.TableChange) -> int:
    # Print the failure information
    print(
        "conflict",
        apsw.mapping_session_conflict[reason],
        f"{change.op=} {change.opcode=}",
        "\n",
        f"{change.conflict=}",
        "\n",
        f"{change.name=} {change.column_count=}",
        "\n",
        f"{change.fk_conflicts=}",
        f"{change.indirect=}",
        "\n",
        f"{change.old=}\n",
        f"{change.new=}\n",
    )

    # save the change for later
    failed.add_change(change)

    # proceed ignoring this failed change
    return apsw.SQLITE_CHANGESET_OMIT


# Undo our earlier changes again
apsw.Changeset.apply(undo, connection, conflict=conflict_handler)

# Now lets see what couldn't apply as SQL
show_changeset("failed", failed.output())
Exception exc=AbortError('query aborted')
conflict SQLITE_CHANGESET_DATA change.op='UPDATE' change.opcode=23
 change.conflict=('F9B19B', 'bathroom ceiling', 'Orange flavour')
 change.name='items' change.column_count=3
 change.fk_conflicts=None change.indirect=False
 change.old=('F9B19B', <class 'apsw.no_change'>, 'high gloss')
 change.new=(<class 'apsw.no_change'>, <class 'apsw.no_change'>, None)

conflict SQLITE_CHANGESET_CONSTRAINT change.op='DELETE' change.opcode=9
 change.conflict=None
 change.name='items' change.column_count=3
 change.fk_conflicts=None change.indirect=False
 change.old=('B9B16B', 'microwave', None)
 change.new=None

conflict SQLITE_CHANGESET_NOTFOUND change.op='DELETE' change.opcode=9
 change.conflict=None
 change.name='item_tag_link' change.column_count=3
 change.fk_conflicts=None change.indirect=False
 change.old=('3E3865', '3E8EC2', None)
 change.new=None

conflict SQLITE_CHANGESET_NOTFOUND change.op='DELETE' change.opcode=9
 change.conflict=None
 change.name='item_tag_link' change.column_count=3
 change.fk_conflicts=None change.indirect=False
 change.old=('3E3865', 'A0339B', None)
 change.new=None

conflict SQLITE_CHANGESET_NOTFOUND change.op='DELETE' change.opcode=9
 change.conflict=None
 change.name='item_tag_link' change.column_count=3
 change.fk_conflicts=None change.indirect=True
 change.old=('3E3865', 'BF1D0D', 'system')
 change.new=None

failed
UPDATE items SET description=NULL WHERE id = 'F9B19B' AND description = 'high gloss';
DELETE FROM items WHERE id = 'B9B16B' AND name = 'microwave' AND description IS NULL;
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = '3E8EC2' AND reason IS NULL;
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'A0339B' AND reason IS NULL;
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'BF1D0D' AND reason = 'system';

Synchronising changes made by two users

Alice and Bob are going to separately work on the same database and we are going to synchronise their changes.

You will notice that the databases did not end up identical. This is because foreign keys, triggers, and the changesets are all fighting each other. You need to be careful when using all of them at the same time. See ChangesetBuilder next where you can make your own changesets for these more complicated situations.

# Start from the same database
alice_connection = apsw.Connection("alice.db")
with alice_connection.backup("main", connection, "main") as backup:
    backup.step()

bob_connection = apsw.Connection("bob.db")
with bob_connection.backup("main", connection, "main") as backup:
    backup.step()

# setup sessions
alice_session = apsw.Session(alice_connection, "main")
alice_session.attach()

bob_session = apsw.Session(bob_connection, "main")
bob_session.attach()

# Each makes changes
alice_connection.execute("""
    UPDATE tags SET label='painting' WHERE label='paint';
    INSERT INTO items(name, description) VALUES('storage closet', 'main storage space');
    INSERT INTO tags(label) VALUES('approval needed');
    -- remove new tag
    DELETE FROM item_tag_link WHERE item_id=(SELECT id FROM items WHERE name='storage closet');
    -- add approval needed
    INSERT INTO item_tag_link(item_id, tag_id)  VALUES(
        (SELECT id FROM items WHERE name='storage closet'),
        (SELECT id FROM tags WHERE label='approval needed')
    );
""")

bob_connection.execute("""
    UPDATE tags SET cost_centre = '150' WHERE label='electrical';
    INSERT INTO items(name) VALUES('storage B');
    -- remove new tag
    DELETE FROM item_tag_link WHERE item_id=(SELECT id FROM items WHERE name='storage B');
    -- add electrical
    INSERT INTO item_tag_link(item_id, tag_id)  VALUES(
        (SELECT id FROM items WHERE name='storage B'),
        (SELECT id FROM tags WHERE label='electrical')
    );
""")

# Get the changesets
alice_changeset = alice_session.changeset()

bob_changeset = bob_session.changeset()

# Apply them to each other's database
apsw.Changeset.apply(alice_changeset, bob_connection)
apsw.Changeset.apply(bob_changeset, alice_connection)


query = """
SELECT items.name AS name, tags.label AS tag, tags.cost_centre AS cost_centre
   FROM tags, items, item_tag_link
   WHERE items.id = item_tag_link.item_id AND tags.id = item_tag_link.tag_id
   ORDER BY items.name, tags.label
"""

print("Alice's database")
print(apsw.ext.format_query_table(alice_connection, query))

print("\nBob's database")
print(apsw.ext.format_query_table(bob_connection, query))

show_changeset("Alice changeset", alice_changeset)

show_changeset("Bob changseset", bob_changeset)
Alice's database
┌──────────────────┬─────────────────┬─────────────┐
│       name       │       tag       │ cost_centre │
│ bathroom ceiling │ painting        │ 110         │
│ bathroom lights  │ electrical      │ 150         │
│ bathroom lights  │ inspection      │ 200         │
│ entrance fan     │ new             │ 100         │
│ entrance floor   │ battery         │ 300         │
│ entrance floor   │ cleaning        │ 300         │
│ storage B        │ electrical      │ 150         │
│ storage B        │ new             │ 100         │
│ storage closet   │ approval needed │ (null)      │
└──────────────────┴─────────────────┴─────────────┘


Bob's database
┌──────────────────┬─────────────────┬─────────────┐
│       name       │       tag       │ cost_centre │
│ bathroom ceiling │ painting        │ 110         │
│ bathroom lights  │ electrical      │ 150         │
│ bathroom lights  │ inspection      │ 200         │
│ entrance fan     │ new             │ 100         │
│ entrance floor   │ battery         │ 300         │
│ entrance floor   │ cleaning        │ 300         │
│ storage B        │ electrical      │ 150         │
│ storage closet   │ approval needed │ (null)      │
│ storage closet   │ new             │ 100         │
└──────────────────┴─────────────────┴─────────────┘

Alice changeset
UPDATE tags SET label='painting' WHERE id = '3E8EC2' AND label = 'paint';
INSERT INTO tags(id, label, cost_centre) VALUES ('309392', 'approval needed', NULL);
INSERT INTO items(id, name, description) VALUES ('B89998', 'storage closet', 'main storage space');
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('B89998', '309392', NULL);

Bob changseset
UPDATE tags SET cost_centre='150' WHERE id = '318E65' AND cost_centre = 145;
INSERT INTO items(id, name, description) VALUES ('7A3024', 'storage B', NULL);
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('7A3024', '318E65', NULL);

ChangesetBuilder

The ChangesetBuilder can be used to combine multiple changesets and individual TableChange. In this example we’ll build up all the changes to the items table from multiple changesets. ChangesetBuilder.schema() is used to ensure the changes map to the expected database table structure (names, primary keys, number of columns).

items = apsw.ChangesetBuilder()

items.schema(connection, "main")

for source in (changeset, alice_changeset, bob_changeset):
    for change in apsw.Changeset.iter(source):
        if change.name == "items":
            items.add_change(change)

only_items = items.output()

show_changeset("Only items table changes", only_items)
Only items table changes
UPDATE items SET description='high gloss' WHERE id = 'F9B19B' AND description IS NULL;
INSERT INTO items(id, name, description) VALUES ('B89998', 'storage closet', 'main storage space');
INSERT INTO items(id, name, description) VALUES ('7A3024', 'storage B', NULL);
INSERT INTO items(id, name, description) VALUES ('3E3865', 'kitchen walls', NULL);
INSERT INTO items(id, name, description) VALUES ('B9B16B', 'microwave', NULL);

Streaming

The changesets above were all produced as a single bytes in memory all at once. For larger changesets we can read and write them in chunks, such as with blobs, files, or network connections.

# Use a positive number to set that size
chunk_size = apsw.session_config(
    apsw.SQLITE_SESSION_CONFIG_STRMSIZE, -1
)
print("default chunk size", chunk_size)

# Some changes to make the changeset larger.  The size is an estimate.
print(f"Before estimate {session.changeset_size=}")
for i in "abcdefghijklmnopqrstuvwxyz":
    connection.execute(
        "INSERT INTO items(name, description) VALUES(?, ?)",
        (i * 1234, i * 1234),
    )
print(f"After estimate {session.changeset_size=}")

# We'll write to a file
out = tempfile.TemporaryFile("w+b")

num_writes = 0


def write(data: memoryview) -> None:
    global num_writes
    num_writes += 1
    res = out.write(data)
    # The streamer must write all bytes
    assert res == len(data)


session.changeset_stream(write)

print("Output file size is", out.tell())
print("Number of writes", num_writes)

# Lets read from the same file, using the streaming iterator
out.seek(0)
num_reads = 0


def read(amount: int) -> bytes:
    global num_reads
    num_reads += 1
    return out.read(amount)


num_changes = 0
for change in apsw.Changeset.iter(read):
    num_changes += 1

print("Number of reads", num_reads)
print("Number of changes", num_changes)
default chunk size 1024
Before estimate session.changeset_size=281
After estimate session.changeset_size=65541
Output file size is 65342
Number of writes 27
Number of reads 65
Number of changes 54

Rebasing

You can merge conflict decisions from an earlier changeset into a later changeset so that you don’t have to separately transport and store those conflict decisions. This can be used to take independently made changesets, and turn them into a linear sequence. The rebaser documentation includes more detail.

To do a rebase, you need to take the conflict resolutions from an Changeset.apply() to Rebaser.configure(), and then Rebaser.rebase() a following changeset.

We are going to make alice then bob appear to have been done in that order without conflicts.

# Reset back to original data with the base changeset applied
connection.execute("""
    DROP TABLE item_tag_link;
    DROP TABLE items;
    DROP TABLE tags;
""")

connection.execute(pathlib.Path("session.sql").read_text())


# The conflict handler we'll use doing latest writer wins - you should
# be more careful.
def conflict_handler(reason: int, change: apsw.TableChange) -> int:
    if reason in (
        apsw.SQLITE_CHANGESET_DATA,
        apsw.SQLITE_CHANGESET_CONFLICT,
    ):
        return apsw.SQLITE_CHANGESET_REPLACE
    return apsw.SQLITE_CHANGESET_OMIT


# apply original changeset that alice was based on
apsw.Changeset.apply(changeset, connection, conflict=conflict_handler)

# Make a rebaser
rebaser = apsw.Rebaser()

# save these conflict resolutions
conflict_resolutions = apsw.Changeset.apply(
    alice_changeset,
    connection,
    conflict=conflict_handler,
    rebase=True,
)

rebaser.configure(conflict_resolutions)

# and apply them to bob's
bob_rebased = rebaser.rebase(bob_changeset)

Table diff

Session.diff() can be used to get the difference between a table in another database and this database. This is useful if the other database was updated without a session being recorded. Note that the table must have a PRIMARY KEY, or it will be ignored.

diff_demo = apsw.Connection("diff_demo.db")

diff_demo.execute("""
    -- our session runs on this
    CREATE TABLE example(x PRIMARY KEY, y, z);
    INSERT INTO example VALUES
            (1, 'one', 1.1),
            (2, 'two', 2.2),
            (3, 'three', 3.3);

    -- the other database
    ATTACH 'other.db' AS other;
    -- the table has to have the same name, primary key, and columns
    CREATE TABLE other.example(x PRIMARY KEY, y, z);
    INSERT INTO other.example VALUES
            -- extra row
            (0, 'zero', 0.0),
            -- id 1 deliberately missing
            (2, 'two', 2.2),
            -- different values
            (3, 'trois', 'hello');
""")

session = apsw.Session(diff_demo, "main")

# You must attach (or filter) to include the table
session.attach("example")

session.diff("other", "example")

diff = session.changeset()

show_changeset("Table diff", diff, diff_demo)
Table diff
DELETE FROM example WHERE x = 0 AND y = 'zero' AND z = 0.0;
INSERT INTO example(x, y, z) VALUES (1, 'one', 1.1);
UPDATE example SET y='three', z=3.3 WHERE x = 3 AND y = 'trois' AND z = 'hello';

Cleanup

We can now close the connections, but it is optional. APSW automatically cleans up sessions when their corresponding connections are closed.

connection.close()
alice_connection.close()
bob_connection.close()