Session Example/Tour
This example shows using APSW to create and use the Session extension
#!/usr/bin/env python3
# This code uses Python's optional typing annotations. You can
# ignore them and do not need to use them. If you do use them
# then you must include this future annotations line first.
from __future__ import annotations
import functools
import pathlib
import tempfile
import apsw
import apsw.ext
Is Session available?
Session must be enabled in SQLite at compile time, and in APSW at its compile time. (PyPI builds always have both enabled)
print("Session in SQLite:", "ENABLE_SESSION" in apsw.compile_options)
print(" Session in APSW:", hasattr(apsw, "Session"))
Session in SQLite: True
Session in APSW: True
Initial database setup
The database is populated with an items table, a tags table, and a link table allowing multiple tags per item. It has foreign keys and a trigger which cause changes beyond the supplied SQL so we can see how session handles that. See the full SQL
connection = apsw.Connection("")
connection.execute(pathlib.Path("session.sql").read_text())
Monitoring changes
You can have multiple Session
monitoring changes on a
session. You need to call Session.attach()
to say which
tables to monitor, or use Session.table_filter()
to get a
callback. You can pause and resume monitoring with
Session.enabled
. For unmonitored changes you can use
Session.diff()
to work out the changes between two tables.
session = apsw.Session(connection, "main")
# enabled by default. You can set it to False while making
# changes you do not want recorded. It only stops recording
# changes to rows not already part of the changeset.
print(f"{session.enabled=}")
# We'd like size estimates
session.config(apsw.SQLITE_SESSION_OBJCONFIG_SIZE, True)
# we now say which tables to monitor - no tables are monitored by default.
# The tables must have PRIMARY KEY in their declaration otherwise
# nothing is recorded.
def table_filter(name: str) -> bool:
print(f"table_filter {name=}")
# We want them all
return True
# We could also have done session.attach() to get all tables
# or attach with named tables of interest.
session.table_filter(table_filter)
# And now make some changes. We do every kind of change here -
# INSERT, UPDATE, and DELETE.
connection.execute("""
INSERT INTO items(name) VALUES('kitchen walls');
INSERT INTO item_tag_link(item_id, tag_id) VALUES(
(SELECT id FROM items WHERE name='kitchen walls'),
(SELECT id FROM tags WHERE label='paint')
);
INSERT INTO item_tag_link(item_id, tag_id) VALUES(
(SELECT id FROM items WHERE name='kitchen walls'),
(SELECT id FROM tags WHERE label='cleaning')
);
INSERT INTO items(name) VALUES('microwave');
UPDATE items SET description='high gloss' WHERE name='bathroom ceiling';
UPDATE tags SET cost_centre=null WHERE label='new';
DELETE FROM tags WHERE label='battery';
""")
# How much memory is the session using?
print(f"{session.memory_used=}")
session.enabled=True
table_filter name='items'
table_filter name='item_tag_link'
table_filter name='tags'
session.memory_used=7352
SQL equivalent of a changeset
We can iterate the contents of a changeset as SQL statements using
apsw.ext.changeset_to_sql()
. It needs to know the column
names because changesets only use column numbers, so we use
apsw.ext.find_columns()
giving it the connection to inspect.
def show_changeset(
title: str,
contents: apsw.SessionStreamInput,
connection: apsw.Connection = connection,
):
print(title)
for statement in apsw.ext.changeset_to_sql(
contents,
get_columns=functools.partial(
apsw.ext.find_columns, connection=connection
),
):
print(statement)
print()
Patchsets and Changesets
Changesets contain all the before and after values for changed rows,
while patchsets only contain the necessary values to make the
change. apsw.ext.changeset_to_sql()
is useful to see what SQL
a change or patch set is equivalent to.
patchset = session.patchset()
print(f"{len(patchset)=}")
show_changeset("patchset", patchset)
# Note how the changeset is larger and contains more information
changeset = session.changeset()
print(f"{len(changeset)=}")
show_changeset("changeset", changeset)
len(patchset)=241
patchset
UPDATE items SET description='high gloss' WHERE id = 'F9B19B';
INSERT INTO items(id, name, description) VALUES ('3E3865', 'kitchen walls', NULL);
INSERT INTO items(id, name, description) VALUES ('B9B16B', 'microwave', NULL);
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '4579B7' AND tag_id = '00BF23';
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', '3E8EC2', NULL);
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'A0339B', NULL);
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'BF1D0D', 'system');
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('B9B16B', 'BF1D0D', 'system');
UPDATE tags SET cost_centre=NULL WHERE id = 'BF1D0D';
DELETE FROM tags WHERE id = '00BF23';
len(changeset)=274
changeset
UPDATE items SET description='high gloss' WHERE id = 'F9B19B' AND description IS NULL;
INSERT INTO items(id, name, description) VALUES ('3E3865', 'kitchen walls', NULL);
INSERT INTO items(id, name, description) VALUES ('B9B16B', 'microwave', NULL);
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '4579B7' AND tag_id = '00BF23' AND reason IS NULL;
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', '3E8EC2', NULL);
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'A0339B', NULL);
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('3E3865', 'BF1D0D', 'system');
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('B9B16B', 'BF1D0D', 'system');
UPDATE tags SET cost_centre=NULL WHERE id = 'BF1D0D' AND cost_centre = 100;
DELETE FROM tags WHERE id = '00BF23' AND label = 'battery' AND cost_centre = 300;
Inverting - undo, redo
We can get the opposite of a changeset which can then form the basis of an undo/redo implementation. One pattern is to have a table where you store changesets allowing for a later undo or redo.
# Yes, it is this easy
undo = apsw.Changeset.invert(changeset)
# Compare this to the changeset above, to see how it does the
# opposite.
show_changeset("undo", undo)
undo
UPDATE items SET description=NULL WHERE id = 'F9B19B' AND description = 'high gloss';
DELETE FROM items WHERE id = '3E3865' AND name = 'kitchen walls' AND description IS NULL;
DELETE FROM items WHERE id = 'B9B16B' AND name = 'microwave' AND description IS NULL;
/* indirect */ INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('4579B7', '00BF23', NULL);
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = '3E8EC2' AND reason IS NULL;
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'A0339B' AND reason IS NULL;
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'BF1D0D' AND reason = 'system';
/* indirect */ DELETE FROM item_tag_link WHERE item_id = 'B9B16B' AND tag_id = 'BF1D0D' AND reason = 'system';
UPDATE tags SET cost_centre=100 WHERE id = 'BF1D0D' AND cost_centre IS NULL;
INSERT INTO tags(id, label, cost_centre) VALUES ('00BF23', 'battery', 300);
Applying changesets
We can filter which tables get affected when applying a
changeset
(default all) and can define a conflict
handler (default abort the transaction). Conflicts are described
here. We are
going to undo our earlier changes.
# However it is going to fail ...
try:
apsw.Changeset.apply(undo, connection)
except apsw.AbortError as exc:
# It will fail, and the database back in the state when we started
# the apply.
print(f"Exception {exc=}")
# The reason it failed is because of the foreign keys automatically
# removing rows in the link table when members of items and tags got
# removed. Lets do it again, but save the failed changes for
# inspection.
failed = apsw.ChangesetBuilder()
# And make some deliberate conflicts
connection.execute("""
UPDATE items SET description = 'Orange flavour' WHERE name = 'bathroom ceiling';
-- Refuse deletes to make constraint failure on delete
CREATE TRIGGER prevent_microwave_deletion
BEFORE DELETE ON items
FOR EACH ROW
WHEN OLD.name = 'microwave'
BEGIN
SELECT RAISE(ABORT, 'Cannot delete items with name "microwave"');
END;
""")
# A conflict handler says what to do
def conflict_handler(reason: int, change: apsw.TableChange) -> int:
# Print the failure information
print(
"conflict",
apsw.mapping_session_conflict[reason],
f"{change.op=} {change.opcode=}",
"\n",
f"{change.conflict=}",
"\n",
f"{change.name=} {change.column_count=}",
"\n",
f"{change.fk_conflicts=}",
f"{change.indirect=}",
"\n",
f"{change.old=}\n",
f"{change.new=}\n",
)
# save the change for later
failed.add_change(change)
# proceed ignoring this failed change
return apsw.SQLITE_CHANGESET_OMIT
# Undo our earlier changes again
apsw.Changeset.apply(undo, connection, conflict=conflict_handler)
# Now lets see what couldn't apply as SQL
show_changeset("failed", failed.output())
Exception exc=AbortError('query aborted')
conflict SQLITE_CHANGESET_DATA change.op='UPDATE' change.opcode=23
change.conflict=('F9B19B', 'bathroom ceiling', 'Orange flavour')
change.name='items' change.column_count=3
change.fk_conflicts=None change.indirect=False
change.old=('F9B19B', <class 'apsw.no_change'>, 'high gloss')
change.new=(<class 'apsw.no_change'>, <class 'apsw.no_change'>, None)
conflict SQLITE_CHANGESET_CONSTRAINT change.op='DELETE' change.opcode=9
change.conflict=None
change.name='items' change.column_count=3
change.fk_conflicts=None change.indirect=False
change.old=('B9B16B', 'microwave', None)
change.new=None
conflict SQLITE_CHANGESET_NOTFOUND change.op='DELETE' change.opcode=9
change.conflict=None
change.name='item_tag_link' change.column_count=3
change.fk_conflicts=None change.indirect=False
change.old=('3E3865', '3E8EC2', None)
change.new=None
conflict SQLITE_CHANGESET_NOTFOUND change.op='DELETE' change.opcode=9
change.conflict=None
change.name='item_tag_link' change.column_count=3
change.fk_conflicts=None change.indirect=False
change.old=('3E3865', 'A0339B', None)
change.new=None
conflict SQLITE_CHANGESET_NOTFOUND change.op='DELETE' change.opcode=9
change.conflict=None
change.name='item_tag_link' change.column_count=3
change.fk_conflicts=None change.indirect=True
change.old=('3E3865', 'BF1D0D', 'system')
change.new=None
failed
UPDATE items SET description=NULL WHERE id = 'F9B19B' AND description = 'high gloss';
DELETE FROM items WHERE id = 'B9B16B' AND name = 'microwave' AND description IS NULL;
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = '3E8EC2' AND reason IS NULL;
DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'A0339B' AND reason IS NULL;
/* indirect */ DELETE FROM item_tag_link WHERE item_id = '3E3865' AND tag_id = 'BF1D0D' AND reason = 'system';
Synchronising changes made by two users
Alice and Bob are going to separately work on the same database and we are going to synchronise their changes.
You will notice that the databases did not end up identical. This is because foreign keys, triggers, and the changesets are all fighting each other. You need to be careful when using all of them at the same time. See ChangesetBuilder next where you can make your own changesets for these more complicated situations.
# Start from the same database
alice_connection = apsw.Connection("alice.db")
with alice_connection.backup("main", connection, "main") as backup:
backup.step()
bob_connection = apsw.Connection("bob.db")
with bob_connection.backup("main", connection, "main") as backup:
backup.step()
# setup sessions
alice_session = apsw.Session(alice_connection, "main")
alice_session.attach()
bob_session = apsw.Session(bob_connection, "main")
bob_session.attach()
# Each makes changes
alice_connection.execute("""
UPDATE tags SET label='painting' WHERE label='paint';
INSERT INTO items(name, description) VALUES('storage closet', 'main storage space');
INSERT INTO tags(label) VALUES('approval needed');
-- remove new tag
DELETE FROM item_tag_link WHERE item_id=(SELECT id FROM items WHERE name='storage closet');
-- add approval needed
INSERT INTO item_tag_link(item_id, tag_id) VALUES(
(SELECT id FROM items WHERE name='storage closet'),
(SELECT id FROM tags WHERE label='approval needed')
);
""")
bob_connection.execute("""
UPDATE tags SET cost_centre = '150' WHERE label='electrical';
INSERT INTO items(name) VALUES('storage B');
-- remove new tag
DELETE FROM item_tag_link WHERE item_id=(SELECT id FROM items WHERE name='storage B');
-- add electrical
INSERT INTO item_tag_link(item_id, tag_id) VALUES(
(SELECT id FROM items WHERE name='storage B'),
(SELECT id FROM tags WHERE label='electrical')
);
""")
# Get the changesets
alice_changeset = alice_session.changeset()
bob_changeset = bob_session.changeset()
# Apply them to each other's database
apsw.Changeset.apply(alice_changeset, bob_connection)
apsw.Changeset.apply(bob_changeset, alice_connection)
query = """
SELECT items.name AS name, tags.label AS tag, tags.cost_centre AS cost_centre
FROM tags, items, item_tag_link
WHERE items.id = item_tag_link.item_id AND tags.id = item_tag_link.tag_id
ORDER BY items.name, tags.label
"""
print("Alice's database")
print(apsw.ext.format_query_table(alice_connection, query))
print("\nBob's database")
print(apsw.ext.format_query_table(bob_connection, query))
show_changeset("Alice changeset", alice_changeset)
show_changeset("Bob changseset", bob_changeset)
Alice's database
┌──────────────────┬─────────────────┬─────────────┐
│ name │ tag │ cost_centre │
│ bathroom ceiling │ painting │ 110 │
│ bathroom lights │ electrical │ 150 │
│ bathroom lights │ inspection │ 200 │
│ entrance fan │ new │ 100 │
│ entrance floor │ battery │ 300 │
│ entrance floor │ cleaning │ 300 │
│ storage B │ electrical │ 150 │
│ storage B │ new │ 100 │
│ storage closet │ approval needed │ (null) │
└──────────────────┴─────────────────┴─────────────┘
Bob's database
┌──────────────────┬─────────────────┬─────────────┐
│ name │ tag │ cost_centre │
│ bathroom ceiling │ painting │ 110 │
│ bathroom lights │ electrical │ 150 │
│ bathroom lights │ inspection │ 200 │
│ entrance fan │ new │ 100 │
│ entrance floor │ battery │ 300 │
│ entrance floor │ cleaning │ 300 │
│ storage B │ electrical │ 150 │
│ storage closet │ approval needed │ (null) │
│ storage closet │ new │ 100 │
└──────────────────┴─────────────────┴─────────────┘
Alice changeset
UPDATE tags SET label='painting' WHERE id = '3E8EC2' AND label = 'paint';
INSERT INTO tags(id, label, cost_centre) VALUES ('309392', 'approval needed', NULL);
INSERT INTO items(id, name, description) VALUES ('B89998', 'storage closet', 'main storage space');
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('B89998', '309392', NULL);
Bob changseset
UPDATE tags SET cost_centre='150' WHERE id = '318E65' AND cost_centre = 145;
INSERT INTO items(id, name, description) VALUES ('7A3024', 'storage B', NULL);
INSERT INTO item_tag_link(item_id, tag_id, reason) VALUES ('7A3024', '318E65', NULL);
ChangesetBuilder
The ChangesetBuilder
can be used to combine multiple
changesets and individual TableChange
. In this example
we’ll build up all the changes to the items
table from
multiple changesets. ChangesetBuilder.schema()
is used
to ensure the changes map to the expected database table
structure (names, primary keys, number of columns).
items = apsw.ChangesetBuilder()
items.schema(connection, "main")
for source in (changeset, alice_changeset, bob_changeset):
for change in apsw.Changeset.iter(source):
if change.name == "items":
items.add_change(change)
only_items = items.output()
show_changeset("Only items table changes", only_items)
Only items table changes
UPDATE items SET description='high gloss' WHERE id = 'F9B19B' AND description IS NULL;
INSERT INTO items(id, name, description) VALUES ('B89998', 'storage closet', 'main storage space');
INSERT INTO items(id, name, description) VALUES ('7A3024', 'storage B', NULL);
INSERT INTO items(id, name, description) VALUES ('3E3865', 'kitchen walls', NULL);
INSERT INTO items(id, name, description) VALUES ('B9B16B', 'microwave', NULL);
Streaming
The changesets above were all produced as a single bytes in memory all at once. For larger changesets we can read and write them in chunks, such as with blobs, files, or network connections.
# Use a positive number to set that size
chunk_size = apsw.session_config(
apsw.SQLITE_SESSION_CONFIG_STRMSIZE, -1
)
print("default chunk size", chunk_size)
# Some changes to make the changeset larger. The size is an estimate.
print(f"Before estimate {session.changeset_size=}")
for i in "abcdefghijklmnopqrstuvwxyz":
connection.execute(
"INSERT INTO items(name, description) VALUES(?, ?)",
(i * 1234, i * 1234),
)
print(f"After estimate {session.changeset_size=}")
# We'll write to a file
out = tempfile.TemporaryFile("w+b")
num_writes = 0
def write(data: memoryview) -> None:
global num_writes
num_writes += 1
res = out.write(data)
# The streamer must write all bytes
assert res == len(data)
session.changeset_stream(write)
print("Output file size is", out.tell())
print("Number of writes", num_writes)
# Lets read from the same file, using the streaming iterator
out.seek(0)
num_reads = 0
def read(amount: int) -> bytes:
global num_reads
num_reads += 1
return out.read(amount)
num_changes = 0
for change in apsw.Changeset.iter(read):
num_changes += 1
print("Number of reads", num_reads)
print("Number of changes", num_changes)
default chunk size 1024
Before estimate session.changeset_size=281
After estimate session.changeset_size=65541
Output file size is 65342
Number of writes 27
Number of reads 65
Number of changes 54
Rebasing
You can merge conflict decisions from an earlier changeset into a later changeset so that you don’t have to separately transport and store those conflict decisions. This can be used to take independently made changesets, and turn them into a linear sequence. The rebaser documentation includes more detail.
To do a rebase, you need to take the conflict resolutions
from an Changeset.apply()
to Rebaser.configure()
, and
then Rebaser.rebase()
a following changeset.
We are going to make alice then bob appear to have been done in that order without conflicts.
# Reset back to original data with the base changeset applied
connection.execute("""
DROP TABLE item_tag_link;
DROP TABLE items;
DROP TABLE tags;
""")
connection.execute(pathlib.Path("session.sql").read_text())
# The conflict handler we'll use doing latest writer wins - you should
# be more careful.
def conflict_handler(reason: int, change: apsw.TableChange) -> int:
if reason in (
apsw.SQLITE_CHANGESET_DATA,
apsw.SQLITE_CHANGESET_CONFLICT,
):
return apsw.SQLITE_CHANGESET_REPLACE
return apsw.SQLITE_CHANGESET_OMIT
# apply original changeset that alice was based on
apsw.Changeset.apply(changeset, connection, conflict=conflict_handler)
# Make a rebaser
rebaser = apsw.Rebaser()
# save these conflict resolutions
conflict_resolutions = apsw.Changeset.apply(
alice_changeset,
connection,
conflict=conflict_handler,
rebase=True,
)
rebaser.configure(conflict_resolutions)
# and apply them to bob's
bob_rebased = rebaser.rebase(bob_changeset)
Table diff
Session.diff()
can be used to get the difference between a
table in another database and this database. This is useful if the
other database was updated without a session being recorded. Note that
the table must have a PRIMARY KEY
, or it will be ignored.
diff_demo = apsw.Connection("diff_demo.db")
diff_demo.execute("""
-- our session runs on this
CREATE TABLE example(x PRIMARY KEY, y, z);
INSERT INTO example VALUES
(1, 'one', 1.1),
(2, 'two', 2.2),
(3, 'three', 3.3);
-- the other database
ATTACH 'other.db' AS other;
-- the table has to have the same name, primary key, and columns
CREATE TABLE other.example(x PRIMARY KEY, y, z);
INSERT INTO other.example VALUES
-- extra row
(0, 'zero', 0.0),
-- id 1 deliberately missing
(2, 'two', 2.2),
-- different values
(3, 'trois', 'hello');
""")
session = apsw.Session(diff_demo, "main")
# You must attach (or filter) to include the table
session.attach("example")
session.diff("other", "example")
diff = session.changeset()
show_changeset("Table diff", diff, diff_demo)
Table diff
DELETE FROM example WHERE x = 0 AND y = 'zero' AND z = 0.0;
INSERT INTO example(x, y, z) VALUES (1, 'one', 1.1);
UPDATE example SET y='three', z=3.3 WHERE x = 3 AND y = 'trois' AND z = 'hello';
Cleanup
We can now close the connections, but it is optional. APSW automatically cleans up sessions when their corresponding connections are closed.
connection.close()
alice_connection.close()
bob_connection.close()