# Process FTS5 queries as documented at https://www.sqlite.org/fts5.html#full_text_query_syntax
# The actual Lemon grammar used is at
# https://sqlite.org/src/file?name=ext/fts5/fts5parse.y
# Tokens https://sqlite.org/src/file?name=ext/fts5/fts5_expr.c
# fts5ExprGetToken
"""
:mod:`apsw.fts5query` Create, parse, and modify queries
There are 3 representations of a query available:
query string
This the string syntax `accepted by FTS5
<https://www.sqlite.org/fts5.html#full_text_query_syntax>`__ where
you represent AND, OR, NEAR, column filtering etc inline in the
string. An example is::
love AND (title:^"big world" NOT summary:"sunset cruise")
parsed
This is a hierarchical representation using :mod:`dataclasses`
with all fields present. Represented as :class:`QUERY`, it uses
:class:`PHRASE`, :class:`NEAR`, :class:`COLUMNFILTER`,
:class:`AND`, :class:`NOT`, and :class:`OR`. The string example
above is::
AND(queries=[PHRASE(phrase='love', initial=False, prefix=False, plus=None),
NOT(match=COLUMNFILTER(columns=['title'],
filter='include',
query=PHRASE(phrase='big world',
initial=True,
prefix=False,
plus=None)),
no_match=COLUMNFILTER(columns=['summary'],
filter='include',
query=PHRASE(phrase='sunset cruise',
initial=False,
prefix=False,
plus=None)))])
dict
This is a hierarchical representation using Python
:class:`dictionaries <dict>` which is easy for logging, storing as
JSON, and manipulating. Fields containing default values are
omitted. When provided to methods in this module, you do not need
to provide intermediate PHRASE - just Python lists and strings
directly. This is the easiest form to programmatically compose
and modify queries in. The string example above is::
{'@': 'AND',
'queries': [{'@': 'PHRASE', 'phrase': 'love'},
{'@': 'NOT',
'match': {'@': 'COLUMNFILTER',
'columns': ['title'],
'filter': 'include',
'query': {'@': 'PHRASE',
'initial': True,
'phrase': 'big world'}},
'no_match': {'@': 'COLUMNFILTER',
'columns': ['summary'],
'filter': 'include',
'query': {'@': 'PHRASE',
'phrase': 'sunset cruise'}}}]}
See :ref:`the example <example_fts_query>`.
.. list-table:: Conversion functions
:header-rows: 1
:widths: auto
* - From type
- To type
- Conversion method
* - query string
- parsed
- :func:`parse_query_string`
* - parsed
- dict
- :func:`to_dict`
* - dict
- parsed
- :func:`from_dict`
* - parsed
- query string
- :func:`to_query_string`
Other helpful functionality includes:
* :func:`quote` to appropriately double quote strings
* :func:`extract_with_column_filters` to get a :class:`QUERY` for a node within
an existing :class:`QUERY` but applying the intermediate column filters.
* :func:`applicable_columns` to work out which columns apply to part of a
:class:`QUERY`
* :func:`walk` to traverse a parsed query
"""
from __future__ import annotations
import sys
import dataclasses
from wsgiref.simple_server import sys_version
try:
from typing import Union, Any, Sequence, NoReturn, Literal, Iterator, TypeAlias
except ImportError:
# TypeAlias is not in Python <= 3.9
pass
import apsw
QUERY_TOKENS_MARKER = "$!Tokens~"
"Special marker at the start of a string to recognise it as a list of tokens for :class:`QueryTokens`"
[docs]
@dataclasses.dataclass
class QueryTokens:
"""`FTS5 query strings <https://www.sqlite.org/fts5.html#fts5_strings>`__ are
passed to `tokenizers
<https://www.sqlite.org/fts5.html#tokenizers>`__ which extract
tokens, such as by splitting on whitespace, lower casing text, and
removing characters like accents.
If you want to query tokens directly then use this class with the
:attr:`tokens` member, using it where :attr:`PHRASE.phrase` goes
and use :func:`to_query_string` to compose your query.
Your FTS5 table must use the
:class:`apsw.fts5.QueryTokensTokenizer` as the first tokenizer in
the list. If the reason for tokenizing includes
`FTS5_TOKENIZE_QUERY` and the text to be tokenized starts with the
special marker, then the tokens are returned.
:attr:`apsw.fts5.Table.supports_query_tokens` will tell you if
query tokens are handled correctly.
:meth:`apsw.fts5.Table.create` parameter ``support_query_tokens``
will ensure the ``tokenize`` table option is correctly set, You
can get the tokens from :attr:`apsw.fts5.Table.tokens`.
You can construct QueryTokens like this::
# One token
QueryTokens(["hello"])
# Token sequence
QueryTokens(["hello". "world", "today"])
# Colocated tokens use a nested list
QueryTokens(["hello", ["first", "1st"]])
To use in a query::
{"@": "NOT", "match": QueryTokens(["hello", "world"]),
"no_match": QueryTokens([["first", "1st"]])}
That would be equivalent to a query of ``"Hello World" NOT
"First"`` if tokens were lower cased, and a tokenizer added a
colocated ``1st`` on seeing ``first``.
"""
tokens: list[str | Sequence[str]]
"The tokens"
@classmethod
def _zero_encode(cls, s: str) -> str:
"Encode any zero bytes"
return s.replace("\0", "$!ZeRo")
@classmethod
def _zero_decode(cls, s: str) -> str:
"Decode any zero bytes"
return s.replace("$!ZeRo", "\0")
[docs]
def encode(self) -> str:
"Produces the tokens encoded with the marker and separator"
res = ""
for token in self.tokens:
if res:
res += "|"
if isinstance(token, str):
res += self._zero_encode(token)
else:
res += ">".join(self._zero_encode(t) for t in token)
return QUERY_TOKENS_MARKER + res
[docs]
@classmethod
def decode(cls, data: str | bytes) -> QueryTokens | None:
"If the marker is present then returns the corresponding :class:`QueryTokens`, otherwise `None`."
if isinstance(data, bytes) and data.startswith(b"$!Tokens~"):
data = data.decode()
if isinstance(data, str) and data.startswith(QUERY_TOKENS_MARKER):
stream: list[str | Sequence[str]] = [
cls._zero_decode(token) for token in data[len(QUERY_TOKENS_MARKER) :].split("|")
]
for i, token in enumerate(stream):
if ">" in token:
stream[i] = tuple(token.split(">"))
return cls(stream)
return None
[docs]
@dataclasses.dataclass
class PHRASE:
"One `phrase <https://www.sqlite.org/fts5.html#fts5_phrases>`__"
phrase: str | QueryTokens
"Text of the phrase. If + was used (eg one+two) then it will be a list of phrases"
initial: bool = False
"If True then the phrase must match the beginning of a column (``^`` was used)"
prefix: bool = False
"If True then if it is a prefix search on the last token in phrase (``*`` was used)"
plus: PHRASE | None = None
"Additional phrase segment, joined by ``+`` in queries"
[docs]
@dataclasses.dataclass
class NEAR:
"`Near query <https://www.sqlite.org/fts5.html#fts5_near_queries>`__"
phrases: Sequence[PHRASE]
"Two or more phrases"
distance: int = 10
"Maximum distance between the phrases"
[docs]
@dataclasses.dataclass
class COLUMNFILTER:
"""Limit query to `certain columns <https://www.sqlite.org/fts5.html#fts5_column_filters>`__
This always reduces the columns that phrase matching will be done
against.
"""
columns: Sequence[str]
"Limit phrase matching by these columns"
filter: Literal["include"] | Literal["exclude"]
"Including or excluding the columns"
query: QUERY
"query the filter applies to, including all nested queries"
[docs]
@dataclasses.dataclass
class AND:
"All queries `must match <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__"
queries: Sequence[QUERY]
[docs]
@dataclasses.dataclass
class OR:
"Any query `must match <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__"
queries: Sequence[QUERY]
[docs]
@dataclasses.dataclass
class NOT:
"match `must match <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__, but no_match `must not <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__"
match: QUERY
no_match: QUERY
# Sphinx makes this real ugly
# https://github.com/sphinx-doc/sphinx/issues/10541
QUERY: TypeAlias = Union[COLUMNFILTER, NEAR, AND, OR, NOT, PHRASE]
"""Type representing all query types."""
[docs]
def to_dict(q: QUERY) -> dict[str, Any]:
"""Converts structure to a dict
This is useful for pretty printing, logging, saving as JSON,
modifying etc.
The dict has a key ``@`` with value corresponding to the dataclass
(eg ``NEAR``, ``PHRASE``, ``AND``) and the same field names as the
corresponding dataclasses. Only fields with non-default values
are emitted.
"""
# @ was picked because it gets printed first if dict keys are sorted, and
# won't conflict with any other key names
if isinstance(q, PHRASE):
res = {"@": "PHRASE", "phrase": q.phrase}
if q.prefix:
res["prefix"] = True
if q.initial:
res["initial"] = True
if q.plus:
res["plus"] = to_dict(q.plus)
return res
if isinstance(q, AND):
return {"@": "AND", "queries": [to_dict(query) for query in q.queries]}
if isinstance(q, OR):
return {"@": "OR", "queries": [to_dict(query) for query in q.queries]}
if isinstance(q, NOT):
return {"@": "NOT", "match": to_dict(q.match), "no_match": to_dict(q.no_match)}
if isinstance(q, NEAR):
res = {"@": "NEAR", "phrases": [to_dict(phrase) for phrase in q.phrases]}
if q.distance != 10:
res["distance"] = q.distance
return res
if isinstance(q, COLUMNFILTER):
return {"@": "COLUMNFILTER", "query": to_dict(q.query), "columns": q.columns, "filter": q.filter}
raise TypeError(f"Unexpected value {q=}")
_dict_name_class = {
"PHRASE": PHRASE,
"NEAR": NEAR,
"COLUMNFILTER": COLUMNFILTER,
"AND": AND,
"OR": OR,
"NOT": NOT,
}
[docs]
def from_dict(d: dict[str, Any] | Sequence[str] | str | QueryTokens) -> QUERY:
"""Turns dict back into a :class:`QUERY`
You can take shortcuts putting `str` or :class:`QueryTokens` in
places where PHRASE is expected. For example this is accepted::
{
"@": "AND,
"queries": ["hello", "world"]
}
"""
if isinstance(d, (str, QueryTokens)):
return PHRASE(d)
if isinstance(d, (Sequence, set)):
res = AND([from_dict(item) for item in d])
if len(res.queries) == 0:
raise ValueError(f"Expected at least one item in {d!r}")
if len(res.queries) == 1:
return res.queries[0]
return res
_type_check(d, dict)
if "@" not in d:
raise ValueError(f"Expected key '@' in dict {d!r}")
klass = _dict_name_class.get(d["@"])
if klass is None:
raise ValueError(f"\"{d['@']}\" is not a known query type")
if klass is PHRASE:
res = PHRASE(
_type_check(d["phrase"], (str, QueryTokens)),
initial=_type_check(d.get("initial", False), bool),
prefix=_type_check(d.get("prefix", False), bool),
)
if "plus" in d:
res.plus = _type_check(from_dict(d["plus"]), PHRASE)
return res
if klass is OR or klass is AND:
queries = d.get("queries")
if not isinstance(queries, (Sequence, set)) or len(queries) < 1:
raise ValueError(f"{d!r} 'queries' must be sequence of at least 1 items")
as_queries = [from_dict(query) for query in queries]
if len(as_queries) == 1:
return as_queries[0]
return klass(as_queries)
if klass is NEAR:
phrases = [_type_check(from_dict(phrase), PHRASE) for phrase in d["phrases"]]
if len(phrases) < 1:
raise ValueError(f"There must be at least one NEAR phrase in {phrases!r}")
res = klass(phrases, _type_check(d.get("distance", 10), int))
if res.distance < 1:
raise ValueError(f"NEAR distance must be at least one in {d!r}")
return res
if klass is NOT:
match, no_match = d.get("match"), d.get("no_match")
if match is None or no_match is None:
raise ValueError(f"{d!r} must have a 'match' and a 'no_match' key")
return klass(from_dict(match), from_dict(no_match))
assert klass is COLUMNFILTER
columns = d.get("columns")
if (
columns is None
or not isinstance(columns, Sequence)
or len(columns) < 1
or not all(isinstance(column, str) for column in columns)
):
raise ValueError(f"{d!r} must have 'columns' key with at least one member sequence, all of str")
filter = d.get("filter")
if filter != "include" and filter != "exclude":
raise ValueError(f"{d!r} must have 'filter' key with value of 'include' or 'exclude'")
query = d.get("query")
if query is None:
raise ValueError(f"{d!r} must have 'query' value")
return klass(columns, filter, from_dict(query))
def _type_check(v: Any, t: Any) -> Any:
if not isinstance(v, t):
raise TypeError(f"Expected {v!r} to be type {t}")
return v
# parentheses are not needed if the contained item has a lower
# priority than the container
_to_query_string_priority = {
OR: 10,
AND: 20,
NOT: 30,
# these are really all the same
COLUMNFILTER: 50,
NEAR: 60,
PHRASE: 80,
}
def _to_query_string_needs_parens(node: QUERY, child: QUERY) -> bool:
return _to_query_string_priority[type(child)] < _to_query_string_priority[type(node)]
[docs]
def to_query_string(q: QUERY) -> str:
"""Returns the corresponding query in text format"""
if isinstance(q, PHRASE):
r = ""
if q.initial:
r += "^"
if isinstance(q.phrase, QueryTokens):
r += quote(q.phrase.encode())
else:
r += quote(q.phrase)
if q.prefix:
r += "*"
if q.plus:
r += " + " + to_query_string(q.plus)
return r
if isinstance(q, OR):
r = ""
for i, query in enumerate(q.queries):
if i:
r += " OR "
# parens is never hit because OR is the lowest priority
assert not _to_query_string_needs_parens(q, query)
r += to_query_string(query)
return r
if isinstance(q, AND):
r = ""
# see parse_implicit_and()
implicit_and = (PHRASE, NEAR, COLUMNFILTER)
for i, query in enumerate(q.queries):
if i:
if isinstance(q.queries[i], implicit_and) and isinstance(q.queries[i - 1], implicit_and):
r += " "
else:
r += " AND "
if _to_query_string_needs_parens(q, query):
r += "("
r += to_query_string(query)
if _to_query_string_needs_parens(q, query):
r += ")"
return r
if isinstance(q, NOT):
r = ""
if _to_query_string_needs_parens(q, q.match):
r += "("
r += to_query_string(q.match)
if _to_query_string_needs_parens(q, q.match):
r += ")"
r += " NOT "
if _to_query_string_needs_parens(q, q.no_match):
r += "("
r += to_query_string(q.no_match)
if _to_query_string_needs_parens(q, q.no_match):
r += ")"
return r
if isinstance(q, NEAR):
r = "NEAR(" + " ".join(to_query_string(phrase) for phrase in q.phrases)
if q.distance != 10:
r += f", {q.distance}"
r += ")"
return r
if isinstance(q, COLUMNFILTER):
r = ""
if q.filter == "exclude":
r += "-"
if len(q.columns) > 1:
r += "{"
for i, column in enumerate(q.columns):
if i:
r += " "
r += quote(column)
if len(q.columns) > 1:
r += "}"
r += ": "
if isinstance(q.query, (PHRASE, NEAR, COLUMNFILTER)):
r += to_query_string(q.query)
else:
r += "(" + to_query_string(q.query) + ")"
return r
raise TypeError(f"Unexpected query item {q!r}")
[docs]
def parse_query_string(query: str) -> QUERY:
"Returns the corresponding :class:`QUERY` for the query string"
return _Parser(query).parsed
[docs]
def quote(text: str | QueryTokens) -> str:
"""Quotes text if necessary to keep it as one unit using FTS5 quoting rules
Some examples:
.. list-table::
:widths: auto
:header-rows: 1
* - text
- return
* - ``hello``
- ``hello``
* - ``one two``
- ``"one two"``
* - (empty string)
- ``""``
* - ``one"two``
- ``"one""two"``
"""
# technically this will also apply to None and empty lists etc
if not text:
return '""'
if isinstance(text, QueryTokens):
return quote(text.encode())
if any(c not in "0123456789_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" and ord(c) < 0x80 for c in text):
return '"' + text.replace('"', '""') + '"'
return text
_walk_attrs = {
# sequences (iterable)
NEAR: ("phrases",),
AND: ("queries",),
OR: ("queries",),
# non-iterable
NOT: ("match", "no_match"),
COLUMNFILTER: ("query",),
}
if sys.version_info >= (3, 10):
def _is_QUERY(obj):
return isinstance(obj, QUERY)
else:
# py 3.9 can't do the above so we always return True. Providing a
# non-query will result in an inscrutable error lower in walk
def _is_QUERY(obj):
return True
[docs]
def walk(start: QUERY) -> Iterator[tuple[tuple[QUERY, ...], QUERY]]:
"""Yields the parents and each node for a query recursively
The query tree is traversed top down. Use it like this::
for parents, node in walk(query):
# parents will be a tuple of parent nodes
# node will be current node
if isinstance(node, PHRASE):
print(node.phrase)
"""
if not _is_QUERY(start):
raise TypeError(f"{start} is not recognised as a QUERY")
# top down - container node first
yield tuple(), start
klass = type(start)
if klass is PHRASE:
# handled by yield at top of function
return
parent = (start,)
attrs = _walk_attrs[klass]
# attributes are not an iterable sequence
if klass in {COLUMNFILTER, NOT}:
for attr in attrs:
for parents, node in walk(getattr(start, attr)):
yield parent + parents, node
return
for attr in attrs:
for child in getattr(start, attr):
for parents, node in walk(child):
yield parent + parents, node
return
[docs]
def extract_with_column_filters(node: QUERY, start: QUERY) -> QUERY:
"""Return a new `QUERY` for a query rooted at `start` with child `node`,
with intermediate :class:`COLUMNFILTER` in between applied.
This is useful if you want to execute a node from a top level
query ensuring the column filters apply.
"""
for parents, child in walk(start):
if child is node:
res = node
for parent in reversed(parents):
if isinstance(parent, COLUMNFILTER):
res = COLUMNFILTER(parent.columns, parent.filter, res)
return res
raise ValueError("node is not part of query")
[docs]
def applicable_columns(node: QUERY, start: QUERY, columns: Sequence[str]) -> set[str]:
"""Return which columns apply to ``node``
You can use :meth:`apsw.fts5.Table.columns_indexed` to get
the column list for a table. The column names are matched using
SQLite semantics (ASCII case insensitive).
If a query column is not in the provided columns, then
:exc:`KeyError` is raised.
"""
query = extract_with_column_filters(node, start)
columns: set[str] = set(columns)
while query is not node:
matches = set()
for query_column in query.columns:
for column in columns:
if 0 == apsw.stricmp(query_column, column):
matches.add(column)
break
else:
raise KeyError(f"No column matching '{query_column}'")
if query.filter == "include":
columns = matches
else:
columns -= matches
query = query.query
return columns
def _flatten(start: QUERY):
"""Reduces nesting depth
For example if AND contains a child AND then the children can be
merged into parent.
If nodes with children (OR / AND) only have one child then they
can be replaced with the child.
:meta private:
"""
for _, node in walk(start):
if isinstance(node, AND):
# children have to be flattened bottom up
for child in node.queries:
_flatten(child)
if any(isinstance(child, AND) for child in node.queries):
new_queries: list[QUERY] = []
for child in node.queries:
if isinstance(child, AND):
new_queries.extend(child.queries)
else:
new_queries.append(child)
node.queries = new_queries
[docs]
class ParseError(Exception):
"""This exception is raised when an error parsing a query string is encountered
A simple printer::
print(exc.query)
print(" " * exc.position + "^", exc.message)
"""
query: str
"The query that was being processed"
message: str
"Description of error"
position: int
"Offset in query where the error occurred"
def __init__(self, query: str, message: str, position: int):
self.query = query
self.message = message
self.position = position
class _Parser:
"""The query tokenization and parsing all in one namespace"""
class TokenType:
# these are assigned the same values as generated by
# lemon, because why not. fts5parse.h
EOF = 0
OR = 1
AND = 2
NOT = 3
TERM = 4
COLON = 5
MINUS = 6
LCP = 7
RCP = 8
STRING = 9
LP = 10
RP = 11
CARET = 12
COMMA = 13
PLUS = 14
STAR = 15
# Add our own
NEAR = 16
@dataclasses.dataclass
class Token:
tok: _Parser.TokenType
pos: int
value: str | None = None
def __init__(self, query: str):
self.query = query
self.tokens = self.get_tokens(query)
self.token_pos = -1
if len(self.tokens) == 1: # only EOF present
# SQLite says "syntax error" as the message
self.error("No query provided", None)
parsed = self.parse_query()
if self.lookahead.tok != _Parser.TokenType.EOF:
self.error("Unexpected", self.lookahead)
self.parsed = parsed
def error(self, message: str, token: Token | None) -> NoReturn:
raise ParseError(self.query, message, token.pos if token else 0)
def _lookahead(self) -> Token:
return self.tokens[self.token_pos + 1]
lookahead = property(_lookahead, doc="Lookahead at next token")
def take_token(self) -> Token:
self.token_pos += 1
return self.tokens[self.token_pos]
infix_precedence = {
TokenType.OR: 10,
TokenType.AND: 20,
TokenType.NOT: 30,
}
def parse_query(self, rbp: int = 0) -> QUERY:
res = self.parse_implicit_and()
while rbp < self.infix_precedence.get(self.lookahead.tok, 0):
token = self.take_token()
res = self.infix(token.tok, res, self.parse_query(self.infix_precedence[token.tok]))
return res
def parse_implicit_and(self) -> QUERY:
# From FTS5 doc:
# any sequence of phrases or NEAR groups (including those
# restricted to matching specified columns) separated only by
# whitespace are handled as if there were an implicit AND
# operator between each pair of phrases or NEAR groups.
# Implicit AND operators are never inserted after or before an
# expression enclosed in parenthesis. Implicit AND operators
# group more tightly than all other operators, including NOT.
sequence: list[QUERY] = []
sequence.append(self.parse_part())
while self.lookahead.tok in {
_Parser.TokenType.MINUS,
_Parser.TokenType.LCP,
_Parser.TokenType.NEAR,
_Parser.TokenType.CARET,
_Parser.TokenType.STRING,
}:
# there is no implicit AND after (query) so we need to
# reject if current token is ) but it is fine after a NEAR
if self.tokens[self.token_pos].tok == _Parser.TokenType.RP and not isinstance(sequence[-1], NEAR):
break
sequence.append(self.parse_part())
return sequence[0] if len(sequence) == 1 else AND(queries=sequence)
def parse_part(self) -> QUERY:
if self.lookahead.tok in {_Parser.TokenType.MINUS, _Parser.TokenType.LCP} or (
self.lookahead.tok == _Parser.TokenType.STRING
and self.tokens[self.token_pos + 2].tok == _Parser.TokenType.COLON
):
return self.parse_colspec()
if self.lookahead.tok == _Parser.TokenType.LP:
token = self.take_token()
query = self.parse_query()
if self.lookahead.tok != _Parser.TokenType.RP:
if self.lookahead.tok == _Parser.TokenType.EOF:
self.error("unclosed (", token)
else:
self.error(f"Expected ) to close ( at position { token.pos}", self.lookahead)
self.take_token()
return query
if self.lookahead.tok == _Parser.TokenType.NEAR:
return self.parse_near()
return self.parse_phrase()
def parse_phrase(self) -> PHRASE:
if self.lookahead.tok not in {_Parser.TokenType.CARET, _Parser.TokenType.STRING}:
self.error("Expected a search term", self.lookahead)
initial = False
sequence: list[PHRASE] = []
if self.lookahead.tok == _Parser.TokenType.CARET:
initial = True
self.take_token()
while True:
token = self.take_token()
if token.tok != _Parser.TokenType.STRING:
self.error("Expected a search term", token)
prefix = False
if self.lookahead.tok == _Parser.TokenType.STAR:
prefix = True
self.take_token()
phrase = QueryTokens.decode(token.value) or token.value
sequence.append(PHRASE(phrase, initial, prefix))
if len(sequence) >= 2:
sequence[-2].plus = sequence[-1]
initial = False
if self.lookahead.tok != _Parser.TokenType.PLUS:
break
self.take_token()
return sequence[0]
def parse_near(self):
# swallow NEAR and open parentheses
self.take_token()
self.take_token()
# phrases - despite what the doc implies, you can do NEAR(one+two)
phrases: list[PHRASE] = []
while self.lookahead.tok not in (_Parser.TokenType.COMMA, _Parser.TokenType.RP):
phrases.append(self.parse_phrase())
# the doc says that at least two phrases are required, but the
# implementation is otherwise
# https://sqlite.org/forum/forumpost/6303d75d63
if len(phrases) < 1:
self.error("Expected phrase", self.lookahead)
# , distance
distance = 10 # default
if self.lookahead.tok == _Parser.TokenType.COMMA:
# absorb comma
self.take_token()
# distance
number = self.take_token()
if (
number.tok != _Parser.TokenType.STRING
or not number.value.isdigit()
# this verifies the number was bare and not quoted like
# NEAR(foo, "10")
or self.query[number.pos] == '"'
):
self.error("Expected number", number)
distance = int(number.value)
# close parentheses
if self.lookahead.tok != _Parser.TokenType.RP:
self.error("Expected )", self.lookahead)
self.take_token()
return NEAR(phrases, distance)
def parse_colspec(self):
include = True
columns: list[str] = []
if self.lookahead.tok == _Parser.TokenType.MINUS:
include = False
self.take_token()
# inside curlys?
if self.lookahead.tok == _Parser.TokenType.LCP:
self.take_token()
while self.lookahead.tok == _Parser.TokenType.STRING:
columns.append(self.take_token().value)
if len(columns) == 0:
self.error("Expected column name", self.lookahead)
if self.lookahead.tok != _Parser.TokenType.RCP:
self.error("Expected }", self.lookahead)
self.take_token()
else:
if self.lookahead.tok != _Parser.TokenType.STRING:
self.error("Expected column name", self.lookahead)
columns.append(self.take_token().value)
if self.lookahead.tok != _Parser.TokenType.COLON:
self.error("Expected :", self.lookahead)
self.take_token()
if self.lookahead.tok == _Parser.TokenType.LP:
query = self.parse_part()
elif self.lookahead.tok == _Parser.TokenType.NEAR:
query = self.parse_near()
else:
query = self.parse_phrase()
return COLUMNFILTER(columns, "include" if include else "exclude", query)
def infix(self, op: _Parser.TokenType, left: QUERY, right: QUERY) -> QUERY:
if op == _Parser.TokenType.NOT:
return NOT(left, right)
klass = {_Parser.TokenType.AND: AND, _Parser.TokenType.OR: OR}[op]
return klass([left, right])
## Tokenization stuff follows. It is all in this parser class
# to avoid namespace pollution
single_char_tokens = {
"(": TokenType.LP,
")": TokenType.RP,
"{": TokenType.LCP,
"}": TokenType.RCP,
":": TokenType.COLON,
",": TokenType.COMMA,
"+": TokenType.PLUS,
"*": TokenType.STAR,
"-": TokenType.MINUS,
"^": TokenType.CARET,
}
# case sensitive
special_words = {
"OR": TokenType.OR,
"NOT": TokenType.NOT,
"AND": TokenType.AND,
"NEAR": TokenType.NEAR,
}
def get_tokens(self, query: str) -> list[Token]:
def skip_spacing():
"Return True if we skipped any spaces"
nonlocal pos
original_pos = pos
# fts5ExprIsspace
while query[pos] in " \t\n\r":
pos += 1
if pos == len(query):
return True
return pos != original_pos
def absorb_quoted():
nonlocal pos
if query[pos] != '"':
return False
# two quotes in a row keeps one and continues string
start = pos + 1
while True:
found = query.find('"', pos + 1)
if found < 0:
raise ParseError(query, "No ending double quote", start - 1)
pos = found
if query[pos : pos + 2] == '""':
pos += 1
continue
break
res.append(_Parser.Token(_Parser.TokenType.STRING, start - 1, query[start:pos].replace('""', '"')))
pos += 1
return True
def absorb_bareword():
nonlocal pos
start = pos
while pos < len(query):
# sqlite3Fts5IsBareword
if (
query[pos] in "0123456789_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz\x1a"
or ord(query[pos]) >= 0x80
):
pos += 1
else:
break
if pos != start:
s = query[start:pos]
res.append(_Parser.Token(self.special_words.get(s, _Parser.TokenType.STRING), start, s))
return True
return False
res: list[_Parser.Token] = []
pos = 0
while pos < len(query):
if skip_spacing():
continue
tok = self.single_char_tokens.get(query[pos])
if tok is not None:
res.append(_Parser.Token(tok, pos))
pos += 1
continue
if absorb_quoted():
continue
if absorb_bareword():
continue
raise ParseError(query, f"Invalid query character '{query[pos]}'", pos)
# add explicit EOF
res.append(_Parser.Token(_Parser.TokenType.EOF, pos))
# fts5 promotes STRING "NEAR" to token NEAR only if followed by "("
# we demote to get the same effect
for i in range(len(res) - 1):
if res[i].tok == _Parser.TokenType.NEAR and res[i + 1].tok != _Parser.TokenType.LP:
res[i].tok = _Parser.TokenType.STRING
return res