Source code for apsw.fts5aux

"""
:mod:`apsw.fts5aux` Implementation of FTS5 auxiliary functions in Python.

Auxiliary functions are used for ranking results, and for processing search
results.

"""

from __future__ import annotations

import dataclasses
import math

import apsw


# This section is a translation of the bm25 C code from the `SQLite
# source https://sqlite.org/src/file?name=ext/fts5/fts5_aux.c serving
# as an example of how to write a ranking function.  It uses the same
# naming conventions and code structure.


@dataclasses.dataclass
class _Bm25Data:
    """Statistical information about each query overall"""

    nPhrase: int
    "Number of phrases in query"
    avgdl: float
    "Average number of tokens in each row"
    aIDF: list[float]
    "Inverse Document Frequency for each phrase"
    weights: list[float]
    "Per column weight - how much each occurrence counts for, defaulting to 1"


def _Bm25GetData(api: apsw.FTS5ExtensionApi, args: apsw.SQLiteValues) -> _Bm25Data:
    """Calculates :class:`_Bm25Data`"""

    # weights must be at least column_count long defaulting to 1.0.
    # Extra values are ignored.  This is done once here and remembered
    # while the C code does it on every row.
    weights: list[float] = list(args)
    if len(weights) < api.column_count:
        weights.extend([1] * (api.column_count - len(weights)))

    # number of phrases and rows in table
    nPhrase = api.phrase_count
    nRow = api.row_count

    # average document length (in tokens) for the table is total
    # number of tokens in all columns of all rows divided by the
    # number of rows
    avgdl = api.column_total_size(-1) / nRow

    # Calculate the inverse document frequency for each phrase
    aIDF: list[float] = []
    for i in range(nPhrase):
        # We need to know how many times the phrase occurs.
        nHit = 0

        def CountCb(_api: apsw.FTS5ExtensionApi, _closure: None):
            # Callback for each row matched.  The parameters are
            # unused.
            nonlocal nHit
            nHit += 1

        api.query_phrase(i, CountCb, None)

        # See the comment in the C code for details on IDF calculation
        idf = math.log((nRow - nHit + 0.5) / (nHit + 0.5))
        # ensure it is at least a positive small number
        idf = max(1e-6, idf)

        aIDF.append(idf)

    # Save for next time
    data = _Bm25Data(nPhrase, avgdl, aIDF, weights)
    api.aux_data = data
    return data


[docs] def bm25(api: apsw.FTS5ExtensionApi, *args: apsw.SQLiteValue) -> float: """Perform the BM25 calculation for a matching row It accepts weights for each column (default 1) which means how much a hit in that column counts for. The builtin function is `described here <https://www.sqlite.org/fts5.html#the_bm25_function>`__. This is a translation of the SQLite C version into Python for illustrative purposes. """ # aux_data is used to store the overall statistical data, # calculated once by _Bm25getData data = api.aux_data or _Bm25GetData(api, args) k1 = 1.2 b = 0.75 # This counts how often each phrase occurs in the row. For each # hit we multiply by the weight for the column, which defaults to # 1.0 aFreq: list[float] = [] for i in range(data.nPhrase): freq: float = 0 for column, offsets in enumerate(api.phrase_locations(i)): freq += data.weights[column] * len(offsets) aFreq.append(freq) # total number of tokens in this row nTok = api.column_size(-1) # calculate the score, starting with some constants k1 = 1.2 b = 0.75 D = nTok score: float = 0.0 for i in range(data.nPhrase): score += data.aIDF[i] * ((aFreq[i] * (k1 + 1.0)) / (aFreq[i] + k1 * (1 - b + b * D / data.avgdl))) # The score has a bigger (positive) number meaning a better match. # FTS5 wants you to do 'ORDER BY rank' giving the better matches # first. Negating the score achieves that. return -score
[docs] def inverse_document_frequency(api: apsw.FTS5ExtensionApi) -> list[float]: """Measures how rare each search phrase is in the content This helper method is intended for use in your own ranking functions. The result is the idf for each phrase in the query. A phrase occurring in almost every row will have a value close to zero, while less frequent phrases have increasingly large positive numbers. The values will always be at least 0.000001 so you don't have to worry about negative numbers or division by zero, even for phrases that are not found. """ # This is ported from the bm25 code above, but using Pythonic # naming idfs: list[float] = [] row_count = api.row_count for i in range(api.phrase_count): # We need to know how many times the phrase occurs. hits = 0 def count_callback(_api: apsw.FTS5ExtensionApi, _closure: None): # Callback for each row matched. The parameters are # unused. nonlocal hits hits += 1 api.query_phrase(i, count_callback, None) # See the comment in the C code for details on IDF calculation idf = math.log((row_count - hits + 0.5) / (hits + 0.5)) # ensure it is at least a positive small number idf = max(1e-6, idf) idfs.append(idf) return idfs
[docs] def subsequence(api: apsw.FTS5ExtensionApi, *args: apsw.SQLiteValue): """Ranking function boosting rows where tokens are in order :func:`bm25` doesn't take into account ordering. Phrase matches like ``"big truck"`` must occur exactly together in that order. Matches for ``big truck`` scores the same providing both words exist anywhere. This function boosts matches where the order does match so ``big red truck`` gets a boost while ``truck, big`` does not for the same query. If the query has phrases and operators (AND, OR, NOT) then those operators are not visible to this function, and it looks for ordering of each phrase. For example ``big OR truck NOT red`` will result in this function boosting ``big ... truck ... red`` in that order. See :attr:`apsw.fts5.QueryInfo.phrases`. It accepts parameters giving the weights for each column (default 1). """ # start with the bm25 base score score = bm25(api, *args) # degrade to bm25 if not enough phrases if api.phrase_count < 2: return score boost = 0 # work out which columns apply columns: set[int] = set.intersection(*(set(api.phrase_columns(i)) for i in range(api.phrase_count))) if columns: # shortest span possible - number of tokens in each phrase except 1 for last shortest_possible = sum(len(phrase) for phrase in api.phrases[:-1]) + 1 for column in columns: if api.aux_data.weights[column]: boost += ( sum(shortest_possible / span for span in _column_spans(api, column)) * api.aux_data.weights[column] ) # make it more negative to come earlier return score - boost
def _column_spans(api: apsw.FTS5ExtensionApi, column: int): # Helper for subsequence to get the spans (distance between first # token of first phrase and first token of last phrase offsets = [api.phrase_column_offsets(phrase, column) for phrase in range(api.phrase_count)] # these start at -1 because the loop below always advances by one first pos: list[int] = [-1] * api.phrase_count try: while True: # This finds a span starting with phrase[0] and all the # intermediate phrases, finishing on finding the last # phrase so we have a complete subsequence offset = -1 for column in range(api.phrase_count): while True: pos[column] += 1 if offsets[column][pos[column]] > offset: offset = offsets[column][pos[column]] break # If looking for A B C D the above could have stopped on # finding A B C A B C D. We now start at the penultimate # phrase C and advance it to just before D, going # backwards through the phrases so we end up with the # shortest possible A B C D. offset = offsets[-1][pos[-1]] for column in range(api.phrase_count - 2, -1, -1): for test_pos in range(len(offsets[column]) - 1, pos[column], -1): if offsets[column][test_pos] < offset: pos[column] = test_pos break offset = offsets[column][pos[column]] yield offsets[-1][pos[-1]] - offsets[0][pos[0]] except IndexError: # we don't bother constantly checking for overrun above as any # overrun means there are no more matches pass
[docs] def position_rank(api: apsw.FTS5ExtensionApi, *args: apsw.SQLiteValue): """Ranking function boosting the earlier in a column phrases are located :func:`bm25` doesn't take into where phrases occur. It makes no difference if a phrase occurs at the beginning, middle, or end. This boost takes into account how early the phrase match is, suitable for content with more significant text towards the beginning. If the query has phrases and operators (AND, OR, NOT) then those operators are not visible to this function, and only the location of each phrase is taken into consideration. See :attr:`apsw.fts5.QueryInfo.phrases`. It accepts parameters giving the weights for each column (default 1). """ # start with the bm25 base score score = bm25(api, *args) weights = api.aux_data.weights boost = 0 for phrase in range(api.phrase_count): for column in range(api.column_count): weight = weights[column] if weight: boost += sum(weight / (1 + offset) for offset in api.phrase_column_offsets(phrase, column)) # make it more negative to come earlier return score - boost