Source code for apsw.fts5query

# Process FTS5 queries as documented at https://www.sqlite.org/fts5.html#full_text_query_syntax

# The actual Lemon grammar used is at
# https://sqlite.org/src/file?name=ext/fts5/fts5parse.y

# Tokens https://sqlite.org/src/file?name=ext/fts5/fts5_expr.c
# fts5ExprGetToken

"""
:mod:`apsw.fts5query` Create, parse, and modify queries

There are 3 representations of a query available:

query string

   This the string syntax `accepted by FTS5
   <https://www.sqlite.org/fts5.html#full_text_query_syntax>`__ where
   you represent AND, OR, NEAR, column filtering etc inline in the
   string.  An example is::

     love AND (title:^"big world" NOT summary:"sunset cruise")

parsed

    This is a hierarchical representation using :mod:`dataclasses`
    with all fields present.  Represented as :class:`QUERY`, it uses
    :class:`PHRASE`,  :class:`NEAR`, :class:`COLUMNFILTER`,
    :class:`AND`, :class:`NOT`, and :class:`OR`.  The string example
    above is::

        AND(queries=[PHRASE(phrase='love', initial=False, prefix=False, plus=None),
                    NOT(match=COLUMNFILTER(columns=['title'],
                                            filter='include',
                                            query=PHRASE(phrase='big world',
                                                        initial=True,
                                                        prefix=False,
                                                        plus=None)),
                        no_match=COLUMNFILTER(columns=['summary'],
                                            filter='include',
                                            query=PHRASE(phrase='sunset cruise',
                                                            initial=False,
                                                            prefix=False,
                                                            plus=None)))])


dict

    This is a hierarchical representation using Python
    :class:`dictionaries <dict>` which is easy for logging, storing as
    JSON, and manipulating.  Fields containing default values are
    omitted.  When provided to methods in this module, you do not need
    to provide intermediate PHRASE - just Python lists and strings
    directly.  This is the easiest form to programmatically compose
    and modify queries in. The string example above is::

        {'@': 'AND',
        'queries': [{'@': 'PHRASE', 'phrase': 'love'},
                    {'@': 'NOT',
                    'match': {'@': 'COLUMNFILTER',
                                'columns': ['title'],
                                'filter': 'include',
                                'query': {'@': 'PHRASE',
                                        'initial': True,
                                        'phrase': 'big world'}},
                    'no_match': {'@': 'COLUMNFILTER',
                                'columns': ['summary'],
                                'filter': 'include',
                                'query': {'@': 'PHRASE',
                                            'phrase': 'sunset cruise'}}}]}

See :ref:`the example <example_fts_query>`.


.. list-table:: Conversion functions
    :header-rows: 1
    :widths: auto

    * - From type
      - To type
      - Conversion method
    * - query string
      - parsed
      - :func:`parse_query_string`
    * - parsed
      - dict
      - :func:`to_dict`
    * - dict
      - parsed
      - :func:`from_dict`
    * - parsed
      - query string
      - :func:`to_query_string`

Other helpful functionality includes:

* :func:`quote` to appropriately double quote strings
* :func:`extract_with_column_filters` to get a :class:`QUERY` for a node within
  an existing :class:`QUERY` but applying the intermediate column filters.
* :func:`applicable_columns` to work out which columns apply to part of a
  :class:`QUERY`
* :func:`walk` to traverse a parsed query
"""

from __future__ import annotations

import sys

import dataclasses
from wsgiref.simple_server import sys_version

try:
    from typing import Union, Any, Sequence, NoReturn, Literal, Iterator, TypeAlias
except ImportError:
    # TypeAlias is not in Python <= 3.9
    pass

import apsw

QUERY_TOKENS_MARKER = "$!Tokens~"
"Special marker at the start of a string to recognise it as a list of tokens for :class:`QueryTokens`"


[docs] @dataclasses.dataclass class QueryTokens: """`FTS5 query strings <https://www.sqlite.org/fts5.html#fts5_strings>`__ are passed to `tokenizers <https://www.sqlite.org/fts5.html#tokenizers>`__ which extract tokens, such as by splitting on whitespace, lower casing text, and removing characters like accents. If you want to query tokens directly then use this class with the :attr:`tokens` member, using it where :attr:`PHRASE.phrase` goes and use :func:`to_query_string` to compose your query. Your FTS5 table must use the :class:`apsw.fts5.QueryTokensTokenizer` as the first tokenizer in the list. If the reason for tokenizing includes `FTS5_TOKENIZE_QUERY` and the text to be tokenized starts with the special marker, then the tokens are returned. :attr:`apsw.fts5.Table.supports_query_tokens` will tell you if query tokens are handled correctly. :meth:`apsw.fts5.Table.create` parameter ``support_query_tokens`` will ensure the ``tokenize`` table option is correctly set, You can get the tokens from :attr:`apsw.fts5.Table.tokens`. You can construct QueryTokens like this:: # One token QueryTokens(["hello"]) # Token sequence QueryTokens(["hello". "world", "today"]) # Colocated tokens use a nested list QueryTokens(["hello", ["first", "1st"]]) To use in a query:: {"@": "NOT", "match": QueryTokens(["hello", "world"]), "no_match": QueryTokens([["first", "1st"]])} That would be equivalent to a query of ``"Hello World" NOT "First"`` if tokens were lower cased, and a tokenizer added a colocated ``1st`` on seeing ``first``. """ tokens: list[str | Sequence[str]] "The tokens" @classmethod def _zero_encode(cls, s: str) -> str: "Encode any zero bytes" return s.replace("\0", "$!ZeRo") @classmethod def _zero_decode(cls, s: str) -> str: "Decode any zero bytes" return s.replace("$!ZeRo", "\0")
[docs] def encode(self) -> str: "Produces the tokens encoded with the marker and separator" res = "" for token in self.tokens: if res: res += "|" if isinstance(token, str): res += self._zero_encode(token) else: res += ">".join(self._zero_encode(t) for t in token) return QUERY_TOKENS_MARKER + res
[docs] @classmethod def decode(cls, data: str | bytes) -> QueryTokens | None: "If the marker is present then returns the corresponding :class:`QueryTokens`, otherwise `None`." if isinstance(data, bytes) and data.startswith(b"$!Tokens~"): data = data.decode() if isinstance(data, str) and data.startswith(QUERY_TOKENS_MARKER): stream: list[str | Sequence[str]] = [ cls._zero_decode(token) for token in data[len(QUERY_TOKENS_MARKER) :].split("|") ] for i, token in enumerate(stream): if ">" in token: stream[i] = tuple(token.split(">")) return cls(stream) return None
[docs] @dataclasses.dataclass class PHRASE: "One `phrase <https://www.sqlite.org/fts5.html#fts5_phrases>`__" phrase: str | QueryTokens "Text of the phrase. If + was used (eg one+two) then it will be a list of phrases" initial: bool = False "If True then the phrase must match the beginning of a column (``^`` was used)" prefix: bool = False "If True then if it is a prefix search on the last token in phrase (``*`` was used)" plus: PHRASE | None = None "Additional phrase segment, joined by ``+`` in queries"
[docs] @dataclasses.dataclass class NEAR: "`Near query <https://www.sqlite.org/fts5.html#fts5_near_queries>`__" phrases: Sequence[PHRASE] "Two or more phrases" distance: int = 10 "Maximum distance between the phrases"
[docs] @dataclasses.dataclass class COLUMNFILTER: """Limit query to `certain columns <https://www.sqlite.org/fts5.html#fts5_column_filters>`__ This always reduces the columns that phrase matching will be done against. """ columns: Sequence[str] "Limit phrase matching by these columns" filter: Literal["include"] | Literal["exclude"] "Including or excluding the columns" query: QUERY "query the filter applies to, including all nested queries"
[docs] @dataclasses.dataclass class AND: "All queries `must match <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__" queries: Sequence[QUERY]
[docs] @dataclasses.dataclass class OR: "Any query `must match <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__" queries: Sequence[QUERY]
[docs] @dataclasses.dataclass class NOT: "match `must match <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__, but no_match `must not <https://www.sqlite.org/fts5.html#fts5_boolean_operators>`__" match: QUERY no_match: QUERY
# Sphinx makes this real ugly # https://github.com/sphinx-doc/sphinx/issues/10541 QUERY: TypeAlias = Union[COLUMNFILTER, NEAR, AND, OR, NOT, PHRASE] """Type representing all query types."""
[docs] def to_dict(q: QUERY) -> dict[str, Any]: """Converts structure to a dict This is useful for pretty printing, logging, saving as JSON, modifying etc. The dict has a key ``@`` with value corresponding to the dataclass (eg ``NEAR``, ``PHRASE``, ``AND``) and the same field names as the corresponding dataclasses. Only fields with non-default values are emitted. """ # @ was picked because it gets printed first if dict keys are sorted, and # won't conflict with any other key names if isinstance(q, PHRASE): res = {"@": "PHRASE", "phrase": q.phrase} if q.prefix: res["prefix"] = True if q.initial: res["initial"] = True if q.plus: res["plus"] = to_dict(q.plus) return res if isinstance(q, AND): return {"@": "AND", "queries": [to_dict(query) for query in q.queries]} if isinstance(q, OR): return {"@": "OR", "queries": [to_dict(query) for query in q.queries]} if isinstance(q, NOT): return {"@": "NOT", "match": to_dict(q.match), "no_match": to_dict(q.no_match)} if isinstance(q, NEAR): res = {"@": "NEAR", "phrases": [to_dict(phrase) for phrase in q.phrases]} if q.distance != 10: res["distance"] = q.distance return res if isinstance(q, COLUMNFILTER): return {"@": "COLUMNFILTER", "query": to_dict(q.query), "columns": q.columns, "filter": q.filter} raise TypeError(f"Unexpected value {q=}")
_dict_name_class = { "PHRASE": PHRASE, "NEAR": NEAR, "COLUMNFILTER": COLUMNFILTER, "AND": AND, "OR": OR, "NOT": NOT, }
[docs] def from_dict(d: dict[str, Any] | Sequence[str] | str | QueryTokens) -> QUERY: """Turns dict back into a :class:`QUERY` You can take shortcuts putting `str` or :class:`QueryTokens` in places where PHRASE is expected. For example this is accepted:: { "@": "AND, "queries": ["hello", "world"] } """ if isinstance(d, (str, QueryTokens)): return PHRASE(d) if isinstance(d, (Sequence, set)): res = AND([from_dict(item) for item in d]) if len(res.queries) == 0: raise ValueError(f"Expected at least one item in {d!r}") if len(res.queries) == 1: return res.queries[0] return res _type_check(d, dict) if "@" not in d: raise ValueError(f"Expected key '@' in dict {d!r}") klass = _dict_name_class.get(d["@"]) if klass is None: raise ValueError(f"\"{d['@']}\" is not a known query type") if klass is PHRASE: res = PHRASE( _type_check(d["phrase"], (str, QueryTokens)), initial=_type_check(d.get("initial", False), bool), prefix=_type_check(d.get("prefix", False), bool), ) if "plus" in d: res.plus = _type_check(from_dict(d["plus"]), PHRASE) return res if klass is OR or klass is AND: queries = d.get("queries") if not isinstance(queries, (Sequence, set)) or len(queries) < 1: raise ValueError(f"{d!r} 'queries' must be sequence of at least 1 items") as_queries = [from_dict(query) for query in queries] if len(as_queries) == 1: return as_queries[0] return klass(as_queries) if klass is NEAR: phrases = [_type_check(from_dict(phrase), PHRASE) for phrase in d["phrases"]] if len(phrases) < 1: raise ValueError(f"There must be at least one NEAR phrase in {phrases!r}") res = klass(phrases, _type_check(d.get("distance", 10), int)) if res.distance < 1: raise ValueError(f"NEAR distance must be at least one in {d!r}") return res if klass is NOT: match, no_match = d.get("match"), d.get("no_match") if match is None or no_match is None: raise ValueError(f"{d!r} must have a 'match' and a 'no_match' key") return klass(from_dict(match), from_dict(no_match)) assert klass is COLUMNFILTER columns = d.get("columns") if ( columns is None or not isinstance(columns, Sequence) or len(columns) < 1 or not all(isinstance(column, str) for column in columns) ): raise ValueError(f"{d!r} must have 'columns' key with at least one member sequence, all of str") filter = d.get("filter") if filter != "include" and filter != "exclude": raise ValueError(f"{d!r} must have 'filter' key with value of 'include' or 'exclude'") query = d.get("query") if query is None: raise ValueError(f"{d!r} must have 'query' value") return klass(columns, filter, from_dict(query))
def _type_check(v: Any, t: Any) -> Any: if not isinstance(v, t): raise TypeError(f"Expected {v!r} to be type {t}") return v # parentheses are not needed if the contained item has a lower # priority than the container _to_query_string_priority = { OR: 10, AND: 20, NOT: 30, # these are really all the same COLUMNFILTER: 50, NEAR: 60, PHRASE: 80, } def _to_query_string_needs_parens(node: QUERY, child: QUERY) -> bool: return _to_query_string_priority[type(child)] < _to_query_string_priority[type(node)]
[docs] def to_query_string(q: QUERY) -> str: """Returns the corresponding query in text format""" if isinstance(q, PHRASE): r = "" if q.initial: r += "^" if isinstance(q.phrase, QueryTokens): r += quote(q.phrase.encode()) else: r += quote(q.phrase) if q.prefix: r += "*" if q.plus: r += " + " + to_query_string(q.plus) return r if isinstance(q, OR): r = "" for i, query in enumerate(q.queries): if i: r += " OR " # parens is never hit because OR is the lowest priority assert not _to_query_string_needs_parens(q, query) r += to_query_string(query) return r if isinstance(q, AND): r = "" # see parse_implicit_and() implicit_and = (PHRASE, NEAR, COLUMNFILTER) for i, query in enumerate(q.queries): if i: if isinstance(q.queries[i], implicit_and) and isinstance(q.queries[i - 1], implicit_and): r += " " else: r += " AND " if _to_query_string_needs_parens(q, query): r += "(" r += to_query_string(query) if _to_query_string_needs_parens(q, query): r += ")" return r if isinstance(q, NOT): r = "" if _to_query_string_needs_parens(q, q.match): r += "(" r += to_query_string(q.match) if _to_query_string_needs_parens(q, q.match): r += ")" r += " NOT " if _to_query_string_needs_parens(q, q.no_match): r += "(" r += to_query_string(q.no_match) if _to_query_string_needs_parens(q, q.no_match): r += ")" return r if isinstance(q, NEAR): r = "NEAR(" + " ".join(to_query_string(phrase) for phrase in q.phrases) if q.distance != 10: r += f", {q.distance}" r += ")" return r if isinstance(q, COLUMNFILTER): r = "" if q.filter == "exclude": r += "-" if len(q.columns) > 1: r += "{" for i, column in enumerate(q.columns): if i: r += " " r += quote(column) if len(q.columns) > 1: r += "}" r += ": " if isinstance(q.query, (PHRASE, NEAR, COLUMNFILTER)): r += to_query_string(q.query) else: r += "(" + to_query_string(q.query) + ")" return r raise TypeError(f"Unexpected query item {q!r}")
[docs] def parse_query_string(query: str) -> QUERY: "Returns the corresponding :class:`QUERY` for the query string" return _Parser(query).parsed
[docs] def quote(text: str | QueryTokens) -> str: """Quotes text if necessary to keep it as one unit using FTS5 quoting rules Some examples: .. list-table:: :widths: auto :header-rows: 1 * - text - return * - ``hello`` - ``hello`` * - ``one two`` - ``"one two"`` * - (empty string) - ``""`` * - ``one"two`` - ``"one""two"`` """ # technically this will also apply to None and empty lists etc if not text: return '""' if isinstance(text, QueryTokens): return quote(text.encode()) if any(c not in "0123456789_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" and ord(c) < 0x80 for c in text): return '"' + text.replace('"', '""') + '"' return text
_walk_attrs = { # sequences (iterable) NEAR: ("phrases",), AND: ("queries",), OR: ("queries",), # non-iterable NOT: ("match", "no_match"), COLUMNFILTER: ("query",), } if sys.version_info >= (3, 10): def _is_QUERY(obj): return isinstance(obj, QUERY) else: # py 3.9 can't do the above so we always return True. Providing a # non-query will result in an inscrutable error lower in walk def _is_QUERY(obj): return True
[docs] def walk(start: QUERY) -> Iterator[tuple[tuple[QUERY, ...], QUERY]]: """Yields the parents and each node for a query recursively The query tree is traversed top down. Use it like this:: for parents, node in walk(query): # parents will be a tuple of parent nodes # node will be current node if isinstance(node, PHRASE): print(node.phrase) """ if not _is_QUERY(start): raise TypeError(f"{start} is not recognised as a QUERY") # top down - container node first yield tuple(), start klass = type(start) if klass is PHRASE: # handled by yield at top of function return parent = (start,) attrs = _walk_attrs[klass] # attributes are not an iterable sequence if klass in {COLUMNFILTER, NOT}: for attr in attrs: for parents, node in walk(getattr(start, attr)): yield parent + parents, node return for attr in attrs: for child in getattr(start, attr): for parents, node in walk(child): yield parent + parents, node return
[docs] def extract_with_column_filters(node: QUERY, start: QUERY) -> QUERY: """Return a new `QUERY` for a query rooted at `start` with child `node`, with intermediate :class:`COLUMNFILTER` in between applied. This is useful if you want to execute a node from a top level query ensuring the column filters apply. """ for parents, child in walk(start): if child is node: res = node for parent in reversed(parents): if isinstance(parent, COLUMNFILTER): res = COLUMNFILTER(parent.columns, parent.filter, res) return res raise ValueError("node is not part of query")
[docs] def applicable_columns(node: QUERY, start: QUERY, columns: Sequence[str]) -> set[str]: """Return which columns apply to ``node`` You can use :meth:`apsw.fts5.Table.columns_indexed` to get the column list for a table. The column names are matched using SQLite semantics (ASCII case insensitive). If a query column is not in the provided columns, then :exc:`KeyError` is raised. """ query = extract_with_column_filters(node, start) columns: set[str] = set(columns) while query is not node: matches = set() for query_column in query.columns: for column in columns: if 0 == apsw.stricmp(query_column, column): matches.add(column) break else: raise KeyError(f"No column matching '{query_column}'") if query.filter == "include": columns = matches else: columns -= matches query = query.query return columns
def _flatten(start: QUERY): """Reduces nesting depth For example if AND contains a child AND then the children can be merged into parent. If nodes with children (OR / AND) only have one child then they can be replaced with the child. :meta private: """ for _, node in walk(start): if isinstance(node, AND): # children have to be flattened bottom up for child in node.queries: _flatten(child) if any(isinstance(child, AND) for child in node.queries): new_queries: list[QUERY] = [] for child in node.queries: if isinstance(child, AND): new_queries.extend(child.queries) else: new_queries.append(child) node.queries = new_queries
[docs] class ParseError(Exception): """This exception is raised when an error parsing a query string is encountered A simple printer:: print(exc.query) print(" " * exc.position + "^", exc.message) """ query: str "The query that was being processed" message: str "Description of error" position: int "Offset in query where the error occurred" def __init__(self, query: str, message: str, position: int): self.query = query self.message = message self.position = position
class _Parser: """The query tokenization and parsing all in one namespace""" class TokenType: # these are assigned the same values as generated by # lemon, because why not. fts5parse.h EOF = 0 OR = 1 AND = 2 NOT = 3 TERM = 4 COLON = 5 MINUS = 6 LCP = 7 RCP = 8 STRING = 9 LP = 10 RP = 11 CARET = 12 COMMA = 13 PLUS = 14 STAR = 15 # Add our own NEAR = 16 @dataclasses.dataclass class Token: tok: _Parser.TokenType pos: int value: str | None = None def __init__(self, query: str): self.query = query self.tokens = self.get_tokens(query) self.token_pos = -1 if len(self.tokens) == 1: # only EOF present # SQLite says "syntax error" as the message self.error("No query provided", None) parsed = self.parse_query() if self.lookahead.tok != _Parser.TokenType.EOF: self.error("Unexpected", self.lookahead) self.parsed = parsed def error(self, message: str, token: Token | None) -> NoReturn: raise ParseError(self.query, message, token.pos if token else 0) def _lookahead(self) -> Token: return self.tokens[self.token_pos + 1] lookahead = property(_lookahead, doc="Lookahead at next token") def take_token(self) -> Token: self.token_pos += 1 return self.tokens[self.token_pos] infix_precedence = { TokenType.OR: 10, TokenType.AND: 20, TokenType.NOT: 30, } def parse_query(self, rbp: int = 0) -> QUERY: res = self.parse_implicit_and() while rbp < self.infix_precedence.get(self.lookahead.tok, 0): token = self.take_token() res = self.infix(token.tok, res, self.parse_query(self.infix_precedence[token.tok])) return res def parse_implicit_and(self) -> QUERY: # From FTS5 doc: # any sequence of phrases or NEAR groups (including those # restricted to matching specified columns) separated only by # whitespace are handled as if there were an implicit AND # operator between each pair of phrases or NEAR groups. # Implicit AND operators are never inserted after or before an # expression enclosed in parenthesis. Implicit AND operators # group more tightly than all other operators, including NOT. sequence: list[QUERY] = [] sequence.append(self.parse_part()) while self.lookahead.tok in { _Parser.TokenType.MINUS, _Parser.TokenType.LCP, _Parser.TokenType.NEAR, _Parser.TokenType.CARET, _Parser.TokenType.STRING, }: # there is no implicit AND after (query) so we need to # reject if current token is ) but it is fine after a NEAR if self.tokens[self.token_pos].tok == _Parser.TokenType.RP and not isinstance(sequence[-1], NEAR): break sequence.append(self.parse_part()) return sequence[0] if len(sequence) == 1 else AND(queries=sequence) def parse_part(self) -> QUERY: if self.lookahead.tok in {_Parser.TokenType.MINUS, _Parser.TokenType.LCP} or ( self.lookahead.tok == _Parser.TokenType.STRING and self.tokens[self.token_pos + 2].tok == _Parser.TokenType.COLON ): return self.parse_colspec() if self.lookahead.tok == _Parser.TokenType.LP: token = self.take_token() query = self.parse_query() if self.lookahead.tok != _Parser.TokenType.RP: if self.lookahead.tok == _Parser.TokenType.EOF: self.error("unclosed (", token) else: self.error(f"Expected ) to close ( at position { token.pos}", self.lookahead) self.take_token() return query if self.lookahead.tok == _Parser.TokenType.NEAR: return self.parse_near() return self.parse_phrase() def parse_phrase(self) -> PHRASE: if self.lookahead.tok not in {_Parser.TokenType.CARET, _Parser.TokenType.STRING}: self.error("Expected a search term", self.lookahead) initial = False sequence: list[PHRASE] = [] if self.lookahead.tok == _Parser.TokenType.CARET: initial = True self.take_token() while True: token = self.take_token() if token.tok != _Parser.TokenType.STRING: self.error("Expected a search term", token) prefix = False if self.lookahead.tok == _Parser.TokenType.STAR: prefix = True self.take_token() phrase = QueryTokens.decode(token.value) or token.value sequence.append(PHRASE(phrase, initial, prefix)) if len(sequence) >= 2: sequence[-2].plus = sequence[-1] initial = False if self.lookahead.tok != _Parser.TokenType.PLUS: break self.take_token() return sequence[0] def parse_near(self): # swallow NEAR and open parentheses self.take_token() self.take_token() # phrases - despite what the doc implies, you can do NEAR(one+two) phrases: list[PHRASE] = [] while self.lookahead.tok not in (_Parser.TokenType.COMMA, _Parser.TokenType.RP): phrases.append(self.parse_phrase()) # the doc says that at least two phrases are required, but the # implementation is otherwise # https://sqlite.org/forum/forumpost/6303d75d63 if len(phrases) < 1: self.error("Expected phrase", self.lookahead) # , distance distance = 10 # default if self.lookahead.tok == _Parser.TokenType.COMMA: # absorb comma self.take_token() # distance number = self.take_token() if ( number.tok != _Parser.TokenType.STRING or not number.value.isdigit() # this verifies the number was bare and not quoted like # NEAR(foo, "10") or self.query[number.pos] == '"' ): self.error("Expected number", number) distance = int(number.value) # close parentheses if self.lookahead.tok != _Parser.TokenType.RP: self.error("Expected )", self.lookahead) self.take_token() return NEAR(phrases, distance) def parse_colspec(self): include = True columns: list[str] = [] if self.lookahead.tok == _Parser.TokenType.MINUS: include = False self.take_token() # inside curlys? if self.lookahead.tok == _Parser.TokenType.LCP: self.take_token() while self.lookahead.tok == _Parser.TokenType.STRING: columns.append(self.take_token().value) if len(columns) == 0: self.error("Expected column name", self.lookahead) if self.lookahead.tok != _Parser.TokenType.RCP: self.error("Expected }", self.lookahead) self.take_token() else: if self.lookahead.tok != _Parser.TokenType.STRING: self.error("Expected column name", self.lookahead) columns.append(self.take_token().value) if self.lookahead.tok != _Parser.TokenType.COLON: self.error("Expected :", self.lookahead) self.take_token() if self.lookahead.tok == _Parser.TokenType.LP: query = self.parse_part() elif self.lookahead.tok == _Parser.TokenType.NEAR: query = self.parse_near() else: query = self.parse_phrase() return COLUMNFILTER(columns, "include" if include else "exclude", query) def infix(self, op: _Parser.TokenType, left: QUERY, right: QUERY) -> QUERY: if op == _Parser.TokenType.NOT: return NOT(left, right) klass = {_Parser.TokenType.AND: AND, _Parser.TokenType.OR: OR}[op] return klass([left, right]) ## Tokenization stuff follows. It is all in this parser class # to avoid namespace pollution single_char_tokens = { "(": TokenType.LP, ")": TokenType.RP, "{": TokenType.LCP, "}": TokenType.RCP, ":": TokenType.COLON, ",": TokenType.COMMA, "+": TokenType.PLUS, "*": TokenType.STAR, "-": TokenType.MINUS, "^": TokenType.CARET, } # case sensitive special_words = { "OR": TokenType.OR, "NOT": TokenType.NOT, "AND": TokenType.AND, "NEAR": TokenType.NEAR, } def get_tokens(self, query: str) -> list[Token]: def skip_spacing(): "Return True if we skipped any spaces" nonlocal pos original_pos = pos # fts5ExprIsspace while query[pos] in " \t\n\r": pos += 1 if pos == len(query): return True return pos != original_pos def absorb_quoted(): nonlocal pos if query[pos] != '"': return False # two quotes in a row keeps one and continues string start = pos + 1 while True: found = query.find('"', pos + 1) if found < 0: raise ParseError(query, "No ending double quote", start - 1) pos = found if query[pos : pos + 2] == '""': pos += 1 continue break res.append(_Parser.Token(_Parser.TokenType.STRING, start - 1, query[start:pos].replace('""', '"'))) pos += 1 return True def absorb_bareword(): nonlocal pos start = pos while pos < len(query): # sqlite3Fts5IsBareword if ( query[pos] in "0123456789_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz\x1a" or ord(query[pos]) >= 0x80 ): pos += 1 else: break if pos != start: s = query[start:pos] res.append(_Parser.Token(self.special_words.get(s, _Parser.TokenType.STRING), start, s)) return True return False res: list[_Parser.Token] = [] pos = 0 while pos < len(query): if skip_spacing(): continue tok = self.single_char_tokens.get(query[pos]) if tok is not None: res.append(_Parser.Token(tok, pos)) pos += 1 continue if absorb_quoted(): continue if absorb_bareword(): continue raise ParseError(query, f"Invalid query character '{query[pos]}'", pos) # add explicit EOF res.append(_Parser.Token(_Parser.TokenType.EOF, pos)) # fts5 promotes STRING "NEAR" to token NEAR only if followed by "(" # we demote to get the same effect for i in range(len(res) - 1): if res[i].tok == _Parser.TokenType.NEAR and res[i + 1].tok != _Parser.TokenType.LP: res[i].tok = _Parser.TokenType.STRING return res