diff --git a/agents-api/agents_api/queries/docs/__init__.py b/agents-api/agents_api/queries/docs/__init__.py new file mode 100644 index 000000000..0ba3db0d4 --- /dev/null +++ b/agents-api/agents_api/queries/docs/__init__.py @@ -0,0 +1,25 @@ +""" +Module: agents_api/models/docs + +This module is responsible for managing document-related operations within the application, particularly for agents and possibly other entities. It serves as a core component of the document management system, enabling features such as document creation, listing, deletion, and embedding of snippets for enhanced search and retrieval capabilities. + +Main functionalities include: +- Creating new documents and associating them with agents or users. +- Listing documents based on various criteria, including ownership and metadata filters. +- Deleting documents by their unique identifiers. +- Embedding document snippets for retrieval purposes. + +The module interacts with other parts of the application, such as the agents and users modules, to provide a comprehensive document management system. Its role is crucial in enabling document search, retrieval, and management features within the context of agents and users. + +This documentation aims to provide clear, concise, and sufficient context for new developers or contributors to understand the module's role without needing to dive deep into the code immediately. +""" + +# ruff: noqa: F401, F403, F405 + +from .create_doc import create_doc +from .delete_doc import delete_doc +from .embed_snippets import embed_snippets +from .get_doc import get_doc +from .list_docs import list_docs +from .search_docs_by_embedding import search_docs_by_embedding +from .search_docs_by_text import search_docs_by_text diff --git a/agents-api/agents_api/queries/docs/create_doc.py b/agents-api/agents_api/queries/docs/create_doc.py new file mode 100644 index 000000000..57be43bdf --- /dev/null +++ b/agents-api/agents_api/queries/docs/create_doc.py @@ -0,0 +1,135 @@ +""" +Timescale-based creation of docs. + +Mirrors the structure of create_file.py, but uses the docs/doc_owners tables. +""" + +import base64 +import hashlib +from typing import Any, Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one +from uuid_extensions import uuid7 + +from ...autogen.openapi_model import CreateDocRequest, Doc +from ...metrics.counters import increase_counter +from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class + +# Base INSERT for docs +doc_query = parse_one(""" +INSERT INTO docs ( + developer_id, + doc_id, + title, + content, + index, + modality, + embedding_model, + embedding_dimensions, + language, + metadata +) +VALUES ( + $1, -- developer_id + $2, -- doc_id + $3, -- title + $4, -- content + $5, -- index + $6, -- modality + $7, -- embedding_model + $8, -- embedding_dimensions + $9, -- language + $10 -- metadata (JSONB) +) +RETURNING *; +""").sql(pretty=True) + +# Owner association query for doc_owners +doc_owner_query = parse_one(""" +WITH inserted_owner AS ( + INSERT INTO doc_owners ( + developer_id, + doc_id, + owner_type, + owner_id + ) + VALUES ($1, $2, $3, $4) + RETURNING doc_id +) +SELECT d.* +FROM inserted_owner io +JOIN docs d ON d.doc_id = io.doc_id; +""").sql(pretty=True) + + +@rewrap_exceptions( + { + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A document with this ID already exists for this developer", + ), + asyncpg.NoDataFoundError: partialclass( + HTTPException, + status_code=404, + detail="The specified owner does not exist", + ), + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="Developer or doc owner not found", + ), + } +) +@wrap_in_class( + Doc, + one=True, + transform=lambda d: { + **d, + "id": d["doc_id"], + # You could optionally return a computed hash or partial content if desired + }, +) +@increase_counter("create_doc") +@pg_query +@beartype +async def create_doc( + *, + developer_id: UUID, + doc_id: UUID | None = None, + data: CreateDocRequest, + owner_type: Literal["user", "agent", "org"] | None = None, + owner_id: UUID | None = None, +) -> list[tuple[str, list]]: + """ + Insert a new doc record into Timescale and optionally associate it with an owner. + """ + # Generate a UUID if not provided + doc_id = doc_id or uuid7() + + # Create the doc record + doc_params = [ + developer_id, + doc_id, + data.title, + data.content, + data.index or 0, # fallback if no snippet index + data.modality or "text", + data.embedding_model or "none", + data.embedding_dimensions or 0, + data.language or "english", + data.metadata or {}, + ] + + queries = [(doc_query, doc_params)] + + # If an owner is specified, associate it: + if owner_type and owner_id: + owner_params = [developer_id, doc_id, owner_type, owner_id] + queries.append((doc_owner_query, owner_params)) + + return queries diff --git a/agents-api/agents_api/queries/docs/delete_doc.py b/agents-api/agents_api/queries/docs/delete_doc.py new file mode 100644 index 000000000..d1e02faf1 --- /dev/null +++ b/agents-api/agents_api/queries/docs/delete_doc.py @@ -0,0 +1,77 @@ +""" +Timescale-based deletion of a doc record. +""" +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one + +from ...autogen.openapi_model import ResourceDeletedResponse +from ...common.utils.datetime import utcnow +from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class + +# Delete doc query + ownership check +delete_doc_query = parse_one(""" +WITH deleted_owners AS ( + DELETE FROM doc_owners + WHERE developer_id = $1 + AND doc_id = $2 + AND ( + ($3::text IS NULL AND $4::uuid IS NULL) + OR (owner_type = $3 AND owner_id = $4) + ) +) +DELETE FROM docs +WHERE developer_id = $1 + AND doc_id = $2 + AND ( + $3::text IS NULL OR EXISTS ( + SELECT 1 FROM doc_owners + WHERE developer_id = $1 + AND doc_id = $2 + AND owner_type = $3 + AND owner_id = $4 + ) + ) +RETURNING doc_id; +""").sql(pretty=True) + + +@rewrap_exceptions( + { + asyncpg.NoDataFoundError: partialclass( + HTTPException, + status_code=404, + detail="Doc not found", + ) + } +) +@wrap_in_class( + ResourceDeletedResponse, + one=True, + transform=lambda d: { + "id": d["doc_id"], + "deleted_at": utcnow(), + "jobs": [], + }, +) +@pg_query +@beartype +async def delete_doc( + *, + developer_id: UUID, + doc_id: UUID, + owner_type: Literal["user", "agent", "org"] | None = None, + owner_id: UUID | None = None, +) -> tuple[str, list]: + """ + Deletes a doc (and associated doc_owners) for the given developer and doc_id. + If owner_type/owner_id is specified, only remove doc if that matches. + """ + return ( + delete_doc_query, + [developer_id, doc_id, owner_type, owner_id], + ) diff --git a/agents-api/agents_api/queries/docs/embed_snippets.py b/agents-api/agents_api/queries/docs/embed_snippets.py new file mode 100644 index 000000000..e69de29bb diff --git a/agents-api/agents_api/queries/docs/get_doc.py b/agents-api/agents_api/queries/docs/get_doc.py new file mode 100644 index 000000000..a0345f5e3 --- /dev/null +++ b/agents-api/agents_api/queries/docs/get_doc.py @@ -0,0 +1,52 @@ +""" +Timescale-based retrieval of a single doc record. +""" +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one + +from ...autogen.openapi_model import Doc +from ..utils import pg_query, wrap_in_class + +doc_query = parse_one(""" +SELECT d.* +FROM docs d +LEFT JOIN doc_owners do ON d.developer_id = do.developer_id AND d.doc_id = do.doc_id +WHERE d.developer_id = $1 + AND d.doc_id = $2 + AND ( + ($3::text IS NULL AND $4::uuid IS NULL) + OR (do.owner_type = $3 AND do.owner_id = $4) + ) +LIMIT 1; +""").sql(pretty=True) + + +@wrap_in_class( + Doc, + one=True, + transform=lambda d: { + **d, + "id": d["doc_id"], + }, +) +@pg_query +@beartype +async def get_doc( + *, + developer_id: UUID, + doc_id: UUID, + owner_type: Literal["user", "agent", "org"] | None = None, + owner_id: UUID | None = None +) -> tuple[str, list]: + """ + Fetch a single doc, optionally constrained to a given owner. + """ + return ( + doc_query, + [developer_id, doc_id, owner_type, owner_id], + ) diff --git a/agents-api/agents_api/queries/docs/list_docs.py b/agents-api/agents_api/queries/docs/list_docs.py new file mode 100644 index 000000000..b145a1cbc --- /dev/null +++ b/agents-api/agents_api/queries/docs/list_docs.py @@ -0,0 +1,91 @@ +""" +Timescale-based listing of docs with optional owner filter and pagination. +""" +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one + +from ...autogen.openapi_model import Doc +from ..utils import pg_query, wrap_in_class + +# Basic listing for all docs by developer +developer_docs_query = parse_one(""" +SELECT d.* +FROM docs d +LEFT JOIN doc_owners do ON d.developer_id = do.developer_id AND d.doc_id = do.doc_id +WHERE d.developer_id = $1 +ORDER BY +CASE + WHEN $4 = 'created_at' AND $5 = 'asc' THEN d.created_at + WHEN $4 = 'created_at' AND $5 = 'desc' THEN d.created_at + WHEN $4 = 'updated_at' AND $5 = 'asc' THEN d.updated_at + WHEN $4 = 'updated_at' AND $5 = 'desc' THEN d.updated_at +END DESC NULLS LAST +LIMIT $2 +OFFSET $3; +""").sql(pretty=True) + +# Listing for docs associated with a specific owner +owner_docs_query = parse_one(""" +SELECT d.* +FROM docs d +JOIN doc_owners do ON d.developer_id = do.developer_id AND d.doc_id = do.doc_id +WHERE do.developer_id = $1 + AND do.owner_id = $6 + AND do.owner_type = $7 +ORDER BY +CASE + WHEN $4 = 'created_at' AND $5 = 'asc' THEN d.created_at + WHEN $4 = 'created_at' AND $5 = 'desc' THEN d.created_at + WHEN $4 = 'updated_at' AND $5 = 'asc' THEN d.updated_at + WHEN $4 = 'updated_at' AND $5 = 'desc' THEN d.updated_at +END DESC NULLS LAST +LIMIT $2 +OFFSET $3; +""").sql(pretty=True) + + +@wrap_in_class( + Doc, + one=False, + transform=lambda d: { + **d, + "id": d["doc_id"], + }, +) +@pg_query +@beartype +async def list_docs( + *, + developer_id: UUID, + owner_id: UUID | None = None, + owner_type: Literal["user", "agent", "org"] | None = None, + limit: int = 100, + offset: int = 0, + sort_by: Literal["created_at", "updated_at"] = "created_at", + direction: Literal["asc", "desc"] = "desc", +) -> tuple[str, list]: + """ + Lists docs with optional owner filtering, pagination, and sorting. + """ + if direction.lower() not in ["asc", "desc"]: + raise HTTPException(status_code=400, detail="Invalid sort direction") + + if limit > 100 or limit < 1: + raise HTTPException(status_code=400, detail="Limit must be between 1 and 100") + + if offset < 0: + raise HTTPException(status_code=400, detail="Offset must be >= 0") + + params = [developer_id, limit, offset, sort_by, direction] + if owner_id and owner_type: + params.extend([owner_id, owner_type]) + query = owner_docs_query + else: + query = developer_docs_query + + return (query, params) diff --git a/agents-api/agents_api/queries/docs/mmr.py b/agents-api/agents_api/queries/docs/mmr.py new file mode 100644 index 000000000..d214e8c04 --- /dev/null +++ b/agents-api/agents_api/queries/docs/mmr.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import logging +from typing import Union + +import numpy as np + +Matrix = Union[list[list[float]], list[np.ndarray], np.ndarray] + +logger = logging.getLogger(__name__) + + +def _cosine_similarity(x: Matrix, y: Matrix) -> np.ndarray: + """Row-wise cosine similarity between two equal-width matrices. + + Args: + x: A matrix of shape (n, m). + y: A matrix of shape (k, m). + + Returns: + A matrix of shape (n, k) where each element (i, j) is the cosine similarity + between the ith row of X and the jth row of Y. + + Raises: + ValueError: If the number of columns in X and Y are not the same. + ImportError: If numpy is not installed. + """ + + if len(x) == 0 or len(y) == 0: + return np.array([]) + + x = [xx for xx in x if xx is not None] + y = [yy for yy in y if yy is not None] + + x = np.array(x) + y = np.array(y) + if x.shape[1] != y.shape[1]: + msg = ( + f"Number of columns in X and Y must be the same. X has shape {x.shape} " + f"and Y has shape {y.shape}." + ) + raise ValueError(msg) + try: + import simsimd as simd # type: ignore + + x = np.array(x, dtype=np.float32) + y = np.array(y, dtype=np.float32) + z = 1 - np.array(simd.cdist(x, y, metric="cosine")) + return z + except ImportError: + logger.debug( + "Unable to import simsimd, defaulting to NumPy implementation. If you want " + "to use simsimd please install with `pip install simsimd`." + ) + x_norm = np.linalg.norm(x, axis=1) + y_norm = np.linalg.norm(y, axis=1) + # Ignore divide by zero errors run time warnings as those are handled below. + with np.errstate(divide="ignore", invalid="ignore"): + similarity = np.dot(x, y.T) / np.outer(x_norm, y_norm) + similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0 + return similarity + + +def maximal_marginal_relevance( + query_embedding: np.ndarray, + embedding_list: list, + lambda_mult: float = 0.5, + k: int = 4, +) -> list[int]: + """Calculate maximal marginal relevance. + + Args: + query_embedding: The query embedding. + embedding_list: A list of embeddings. + lambda_mult: The lambda parameter for MMR. Default is 0.5. + k: The number of embeddings to return. Default is 4. + + Returns: + A list of indices of the embeddings to return. + + Raises: + ImportError: If numpy is not installed. + """ + + if min(k, len(embedding_list)) <= 0: + return [] + if query_embedding.ndim == 1: + query_embedding = np.expand_dims(query_embedding, axis=0) + similarity_to_query = _cosine_similarity(query_embedding, embedding_list)[0] + most_similar = int(np.argmax(similarity_to_query)) + idxs = [most_similar] + selected = np.array([embedding_list[most_similar]]) + while len(idxs) < min(k, len(embedding_list)): + best_score = -np.inf + idx_to_add = -1 + similarity_to_selected = _cosine_similarity(embedding_list, selected) + for i, query_score in enumerate(similarity_to_query): + if i in idxs: + continue + redundant_score = max(similarity_to_selected[i]) + equation_score = ( + lambda_mult * query_score - (1 - lambda_mult) * redundant_score + ) + if equation_score > best_score: + best_score = equation_score + idx_to_add = i + idxs.append(idx_to_add) + selected = np.append(selected, [embedding_list[idx_to_add]], axis=0) + return idxs diff --git a/agents-api/agents_api/queries/docs/search_docs_by_embedding.py b/agents-api/agents_api/queries/docs/search_docs_by_embedding.py new file mode 100644 index 000000000..c62188b61 --- /dev/null +++ b/agents-api/agents_api/queries/docs/search_docs_by_embedding.py @@ -0,0 +1,70 @@ +""" +Timescale-based doc embedding search using the `embedding` column. +""" + +import asyncpg +from typing import Literal, List +from uuid import UUID + +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one + +from ...autogen.openapi_model import Doc +from ..utils import pg_query, wrap_in_class + +# If you're doing approximate ANN (DiskANN) or IVF, you might use a special function or hint. +# For a basic vector distance search, you can do something like: +search_docs_by_embedding_query = parse_one(""" +SELECT d.*, + (d.embedding <-> $3) AS distance +FROM docs d +LEFT JOIN doc_owners do + ON d.developer_id = do.developer_id + AND d.doc_id = do.doc_id +WHERE d.developer_id = $1 + AND ( + ($4::text IS NULL AND $5::uuid IS NULL) + OR (do.owner_type = $4 AND do.owner_id = $5) + ) + AND d.embedding IS NOT NULL +ORDER BY d.embedding <-> $3 +LIMIT $2; +""").sql(pretty=True) + +@wrap_in_class( + Doc, + one=False, + transform=lambda rec: { + **rec, + "id": rec["doc_id"], + }, +) +@pg_query +@beartype +async def search_docs_by_embedding( + *, + developer_id: UUID, + query_embedding: List[float], + k: int = 10, + owner_type: Literal["user", "agent", "org"] | None = None, + owner_id: UUID | None = None, +) -> tuple[str, list]: + """ + Vector-based doc search: + - developer_id is required + - query_embedding: the vector to query + - k: number of results to return + - owner_type/owner_id: optional doc ownership filter + """ + if k < 1: + raise HTTPException(status_code=400, detail="k must be >= 1") + + # Validate embedding length if needed; e.g. 1024 floats + if not query_embedding: + raise HTTPException(status_code=400, detail="Empty embedding provided") + + return ( + search_docs_by_embedding_query, + [developer_id, k, query_embedding, owner_type, owner_id], + ) diff --git a/agents-api/agents_api/queries/docs/search_docs_by_text.py b/agents-api/agents_api/queries/docs/search_docs_by_text.py new file mode 100644 index 000000000..c9a5a93e2 --- /dev/null +++ b/agents-api/agents_api/queries/docs/search_docs_by_text.py @@ -0,0 +1,65 @@ +""" +Timescale-based doc text search using the `search_tsv` column. +""" + +import asyncpg +from typing import Literal +from uuid import UUID + +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one + +from ...autogen.openapi_model import Doc +from ..utils import pg_query, wrap_in_class + +search_docs_text_query = parse_one(""" +SELECT d.*, + ts_rank_cd(d.search_tsv, websearch_to_tsquery($3)) AS rank +FROM docs d +LEFT JOIN doc_owners do + ON d.developer_id = do.developer_id + AND d.doc_id = do.doc_id +WHERE d.developer_id = $1 + AND ( + ($4::text IS NULL AND $5::uuid IS NULL) + OR (do.owner_type = $4 AND do.owner_id = $5) + ) + AND d.search_tsv @@ websearch_to_tsquery($3) +ORDER BY rank DESC +LIMIT $2; +""").sql(pretty=True) + + +@wrap_in_class( + Doc, + one=False, + transform=lambda rec: { + **rec, + "id": rec["doc_id"], + }, +) +@pg_query +@beartype +async def search_docs_by_text( + *, + developer_id: UUID, + query: str, + k: int = 10, + owner_type: Literal["user", "agent", "org"] | None = None, + owner_id: UUID | None = None, +) -> tuple[str, list]: + """ + Full-text search on docs using the search_tsv column. + - developer_id: required + - query: the text to look for + - k: max results + - owner_type / owner_id: optional doc ownership filter + """ + if k < 1: + raise HTTPException(status_code=400, detail="k must be >= 1") + + return ( + search_docs_text_query, + [developer_id, k, query, owner_type, owner_id], + ) diff --git a/agents-api/agents_api/queries/docs/search_docs_hybrid.py b/agents-api/agents_api/queries/docs/search_docs_hybrid.py new file mode 100644 index 000000000..9e8d84dc7 --- /dev/null +++ b/agents-api/agents_api/queries/docs/search_docs_hybrid.py @@ -0,0 +1,159 @@ +""" +Hybrid doc search that merges text search and embedding search results +via a simple distribution-based score fusion or direct weighting in Python. +""" + +from typing import Literal, List +from uuid import UUID + +from beartype import beartype +from fastapi import HTTPException + +from ...autogen.openapi_model import Doc +from ..utils import run_concurrently +from .search_docs_by_text import search_docs_by_text +from .search_docs_by_embedding import search_docs_by_embedding + +def dbsf_normalize(scores: List[float]) -> List[float]: + """ + Example distribution-based normalization: clamp each score + from (mean - 3*stddev) to (mean + 3*stddev) and scale to 0..1 + """ + import statistics + if len(scores) < 2: + return scores + m = statistics.mean(scores) + sd = statistics.pstdev(scores) # population std + if sd == 0: + return scores + upper = m + 3*sd + lower = m - 3*sd + def clamp_scale(v): + c = min(upper, max(lower, v)) + return (c - lower) / (upper - lower) + return [clamp_scale(s) for s in scores] + +@beartype +def fuse_results( + text_docs: List[Doc], embedding_docs: List[Doc], alpha: float +) -> List[Doc]: + """ + Merges text search results (descending by text rank) with + embedding results (descending by closeness or inverse distance). + alpha ~ how much to weigh the embedding score + """ + # Suppose we stored each doc's "distance" from the embedding query, and + # for text search we store a rank or negative distance. We'll unify them: + # Make up a dictionary of doc_id -> text_score, doc_id -> embed_score + # For example, text_score = -distance if you want bigger = better + text_scores = {} + embed_scores = {} + for doc in text_docs: + # If you had "rank", you might store doc.distance = rank + # For demo, let's assume doc.distance is negative... up to you + text_scores[doc.id] = float(-doc.distance if doc.distance else 0) + + for doc in embedding_docs: + # Lower distance => better, so we do embed_score = -distance + embed_scores[doc.id] = float(-doc.distance if doc.distance else 0) + + # Normalize them + text_vals = list(text_scores.values()) + embed_vals = list(embed_scores.values()) + text_vals_norm = dbsf_normalize(text_vals) + embed_vals_norm = dbsf_normalize(embed_vals) + + # Map them back + t_keys = list(text_scores.keys()) + for i, key in enumerate(t_keys): + text_scores[key] = text_vals_norm[i] + e_keys = list(embed_scores.keys()) + for i, key in enumerate(e_keys): + embed_scores[key] = embed_vals_norm[i] + + # Gather all doc IDs + all_ids = set(text_scores.keys()) | set(embed_scores.keys()) + + # Weighted sum => combined + out = [] + for doc_id in all_ids: + # text and embed might be missing doc_id => 0 + t_score = text_scores.get(doc_id, 0) + e_score = embed_scores.get(doc_id, 0) + combined = alpha * e_score + (1 - alpha) * t_score + # We'll store final "distance" as -(combined) so bigger combined => smaller distance + out.append((doc_id, combined)) + + # Sort descending by combined + out.sort(key=lambda x: x[1], reverse=True) + + # Convert to doc objects. We can pick from text_docs or embedding_docs or whichever is found. + # If present in both, we can merge fields. For simplicity, just pick from text_docs then fallback embedding_docs. + + # Create a quick ID->doc map + text_map = {d.id: d for d in text_docs} + embed_map = {d.id: d for d in embedding_docs} + + final_docs = [] + for doc_id, score in out: + doc = text_map.get(doc_id) or embed_map.get(doc_id) + doc = doc.model_copy() # or copy if you are using Pydantic + doc.distance = float(-score) # so a higher combined => smaller distance + final_docs.append(doc) + return final_docs + + +@beartype +async def search_docs_hybrid( + developer_id: UUID, + text_query: str = "", + embedding: List[float] = None, + k: int = 10, + alpha: float = 0.5, + owner_type: Literal["user", "agent", "org"] | None = None, + owner_id: UUID | None = None, +) -> List[Doc]: + """ + Hybrid text-and-embedding doc search. We get top-K from each approach, + then fuse them client-side. Adjust concurrency or approach as you like. + """ + # We'll dispatch two queries in parallel + # (One full-text, one embedding-based) each limited to K + tasks = [] + if text_query.strip(): + tasks.append( + search_docs_by_text( + developer_id=developer_id, + query=text_query, + k=k, + owner_type=owner_type, + owner_id=owner_id, + ) + ) + else: + tasks.append([]) # no text results if query is empty + + if embedding and any(embedding): + tasks.append( + search_docs_by_embedding( + developer_id=developer_id, + query_embedding=embedding, + k=k, + owner_type=owner_type, + owner_id=owner_id, + ) + ) + else: + tasks.append([]) + + # Run concurrently (or sequentially, if you prefer) + # If you have a 'run_concurrently' from your old code, you can do: + # text_results, embed_results = await run_concurrently([task1, task2]) + # Otherwise just do them in parallel with e.g. asyncio.gather: + from asyncio import gather + text_results, embed_results = await gather(*tasks) + + # fuse them + fused = fuse_results(text_results, embed_results, alpha) + # Then pick top K overall + return fused[:k]