Skip to content

Commit

Permalink
Merge pull request #3149 from danswer-ai/hotfix/v0.13-overlapping-con…
Browse files Browse the repository at this point in the history
…nectors

merge overlapping connector hotfix
  • Loading branch information
rkuo-danswer authored Nov 17, 2024
2 parents cce0ec2 + 9e39149 commit 574ef47
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 48 deletions.
16 changes: 8 additions & 8 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,22 +433,22 @@ def run_indexing_entrypoint(
with get_session_with_tenant(tenant_id) as db_session:
attempt = transition_attempt_to_in_progress(index_attempt_id, db_session)

tenant_str = ""
if tenant_id is not None:
tenant_str = f" for tenant {tenant_id}"

logger.info(
f"Indexing starting for tenant {tenant_id}: "
if tenant_id is not None
else ""
+ f"connector='{attempt.connector_credential_pair.connector.name}' "
f"Indexing starting{tenant_str}: "
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"
)

_run_indexing(db_session, attempt, tenant_id, callback)

logger.info(
f"Indexing finished for tenant {tenant_id}: "
if tenant_id is not None
else ""
+ f"connector='{attempt.connector_credential_pair.connector.name}' "
f"Indexing finished{tenant_str}: "
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"
)
Expand Down
25 changes: 7 additions & 18 deletions backend/danswer/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,23 +323,23 @@ def upsert_documents(


def upsert_document_by_connector_credential_pair(
db_session: Session, document_metadata_batch: list[DocumentMetadata]
db_session: Session, connector_id: int, credential_id: int, document_ids: list[str]
) -> None:
"""NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
if not document_metadata_batch:
logger.info("`document_metadata_batch` is empty. Skipping.")
if not document_ids:
logger.info("`document_ids` is empty. Skipping.")
return

insert_stmt = insert(DocumentByConnectorCredentialPair).values(
[
model_to_dict(
DocumentByConnectorCredentialPair(
id=document_metadata.document_id,
connector_id=document_metadata.connector_id,
credential_id=document_metadata.credential_id,
id=doc_id,
connector_id=connector_id,
credential_id=credential_id,
)
)
for document_metadata in document_metadata_batch
for doc_id in document_ids
]
)
# for now, there are no columns to update. If more metadata is added, then this
Expand Down Expand Up @@ -400,17 +400,6 @@ def mark_document_as_synced(document_id: str, db_session: Session) -> None:
db_session.commit()


def upsert_documents_complete(
db_session: Session,
document_metadata_batch: list[DocumentMetadata],
) -> None:
upsert_documents(db_session, document_metadata_batch)
upsert_document_by_connector_credential_pair(db_session, document_metadata_batch)
logger.info(
f"Upserted {len(document_metadata_batch)} document store entries into DB"
)


def delete_document_by_connector_credential_pair__no_commit(
db_session: Session,
document_id: str,
Expand Down
59 changes: 37 additions & 22 deletions backend/danswer/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from danswer.db.document import prepare_to_modify_documents
from danswer.db.document import update_docs_last_modified__no_commit
from danswer.db.document import update_docs_updated_at__no_commit
from danswer.db.document import upsert_documents_complete
from danswer.db.document import upsert_document_by_connector_credential_pair
from danswer.db.document import upsert_documents
from danswer.db.document_set import fetch_document_sets_for_documents
from danswer.db.index_attempt import create_index_attempt_error
from danswer.db.models import Document as DBDocument
Expand Down Expand Up @@ -56,13 +57,13 @@ def __call__(
...


def upsert_documents_in_db(
def _upsert_documents_in_db(
documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
db_session: Session,
) -> None:
# Metadata here refers to basic document info, not metadata about the actual content
doc_m_batch: list[DocumentMetadata] = []
document_metadata_list: list[DocumentMetadata] = []
for doc in documents:
first_link = next(
(section.link for section in doc.sections if section.link), ""
Expand All @@ -77,12 +78,9 @@ def upsert_documents_in_db(
secondary_owners=get_experts_stores_representations(doc.secondary_owners),
from_ingestion_api=doc.from_ingestion_api,
)
doc_m_batch.append(db_doc_metadata)
document_metadata_list.append(db_doc_metadata)

upsert_documents_complete(
db_session=db_session,
document_metadata_batch=doc_m_batch,
)
upsert_documents(db_session, document_metadata_list)

# Insert document content metadata
for doc in documents:
Expand All @@ -109,7 +107,10 @@ def get_doc_ids_to_update(
documents: list[Document], db_docs: list[DBDocument]
) -> list[Document]:
"""Figures out which documents actually need to be updated. If a document is already present
and the `updated_at` hasn't changed, we shouldn't need to do anything with it."""
and the `updated_at` hasn't changed, we shouldn't need to do anything with it.
NB: Still need to associate the document in the DB if multiple connectors are
indexing the same doc."""
id_update_time_map = {
doc.id: doc.doc_updated_at for doc in db_docs if doc.doc_updated_at
}
Expand Down Expand Up @@ -197,7 +198,7 @@ def index_doc_batch_prepare(
) -> DocumentBatchPrepareContext | None:
"""This sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
This preceeds indexing it into the actual document index."""
documents = []
documents: list[Document] = []
for document in document_batch:
empty_contents = not any(section.text.strip() for section in document.sections)
if (
Expand All @@ -223,32 +224,46 @@ def index_doc_batch_prepare(
else:
documents.append(document)

document_ids = [document.id for document in documents]
# Create a trimmed list of docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
document_ids: list[str] = [document.id for document in documents]
db_docs: list[DBDocument] = get_documents_by_ids(
db_session=db_session,
document_ids=document_ids,
)

# Skip indexing docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
updatable_docs = (
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
if not ignore_time_skip
else documents
)

# No docs to update either because the batch is empty or every doc was already indexed
if not updatable_docs:
return None
# for all updatable docs, upsert into the DB
# Does not include doc_updated_at which is also used to indicate a successful update
if updatable_docs:
_upsert_documents_in_db(
documents=updatable_docs,
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
)

# Create records in the source of truth about these documents,
# does not include doc_updated_at which is also used to indicate a successful update
upsert_documents_in_db(
documents=documents,
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
logger.info(
f"Upserted {len(updatable_docs)} changed docs out of "
f"{len(documents)} total docs into the DB"
)

# for all docs, upsert the document to cc pair relationship
upsert_document_by_connector_credential_pair(
db_session,
index_attempt_metadata.connector_id,
index_attempt_metadata.credential_id,
document_ids,
)

# No docs to process because the batch is empty or every doc was already indexed
if not updatable_docs:
return None

id_to_db_doc_map = {doc.id: doc for doc in db_docs}
return DocumentBatchPrepareContext(
updatable_docs=updatable_docs, id_to_db_doc_map=id_to_db_doc_map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,78 @@
from tests.integration.common_utils.vespa import vespa_fixture


# def test_connector_creation(reset: None) -> None:
# # Creating an admin user (first user created is automatically an admin)
# admin_user: DATestUser = UserManager.create(name="admin_user")

# # create connectors
# cc_pair_1 = CCPairManager.create_from_scratch(
# source=DocumentSource.INGESTION_API,
# user_performing_action=admin_user,
# )

# cc_pair_info = CCPairManager.get_single(
# cc_pair_1.id, user_performing_action=admin_user
# )
# assert cc_pair_info
# assert cc_pair_info.creator
# assert str(cc_pair_info.creator) == admin_user.id
# assert cc_pair_info.creator_email == admin_user.email


# TODO(rkuo): will enable this once i have credentials on github
# def test_overlapping_connector_creation(reset: None) -> None:
# # Creating an admin user (first user created is automatically an admin)
# admin_user: DATestUser = UserManager.create(name="admin_user")

# config = {
# "wiki_base": os.environ["CONFLUENCE_TEST_SPACE_URL"],
# "space": os.environ["CONFLUENCE_TEST_SPACE"],
# "is_cloud": True,
# "page_id": "",
# }

# credential = {
# "confluence_username": os.environ["CONFLUENCE_USER_NAME"],
# "confluence_access_token": os.environ["CONFLUENCE_ACCESS_TOKEN"],
# }

# # store the time before we create the connector so that we know after
# # when the indexing should have started
# now = datetime.now(timezone.utc)

# # create connector
# cc_pair_1 = CCPairManager.create_from_scratch(
# source=DocumentSource.CONFLUENCE,
# connector_specific_config=config,
# credential_json=credential,
# user_performing_action=admin_user,
# )

# CCPairManager.wait_for_indexing(
# cc_pair_1, now, timeout=60, user_performing_action=admin_user
# )

# cc_pair_2 = CCPairManager.create_from_scratch(
# source=DocumentSource.CONFLUENCE,
# connector_specific_config=config,
# credential_json=credential,
# user_performing_action=admin_user,
# )

# CCPairManager.wait_for_indexing(
# cc_pair_2, now, timeout=60, user_performing_action=admin_user
# )

# info_1 = CCPairManager.get_single(cc_pair_1.id)
# assert info_1

# info_2 = CCPairManager.get_single(cc_pair_2.id)
# assert info_2

# assert info_1.num_docs_indexed == info_2.num_docs_indexed


def test_connector_deletion(reset: None, vespa_client: vespa_fixture) -> None:
# Creating an admin user (first user created is automatically an admin)
admin_user: DATestUser = UserManager.create(name="admin_user")
Expand Down

0 comments on commit 574ef47

Please sign in to comment.