Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved handling of multidoc insertion errors for vector store #110

Merged
merged 6 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libs/astradb/langchain_astradb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from langchain_astradb.document_loaders import AstraDBLoader
from langchain_astradb.graph_vectorstores import AstraDBGraphVectorStore
from langchain_astradb.storage import AstraDBByteStore, AstraDBStore
from langchain_astradb.vectorstores import AstraDBVectorStore
from langchain_astradb.vectorstores import AstraDBVectorStore, AstraDBVectorStoreError

__all__ = [
"AstraDBByteStore",
Expand All @@ -18,5 +18,6 @@
"AstraDBSemanticCache",
"AstraDBStore",
"AstraDBVectorStore",
"AstraDBVectorStoreError",
"CollectionVectorServiceOptions",
]
60 changes: 52 additions & 8 deletions libs/astradb/langchain_astradb/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
DEFAULT_INDEXING_OPTIONS = {"allow": ["metadata"]}
# error code to check for during bulk insertions
DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE = "DOCUMENT_ALREADY_EXISTS"
# max number of errors shown in full insertion error messages
MAX_SHOWN_INSERTION_ERRORS = 8

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -146,6 +148,46 @@ def _validate_autodetect_init_params(
raise ValueError(msg)


def _insertmany_error_message(err: InsertManyException) -> str:
"""Format an astrapy insert exception into an error message.

This utility prepares a detailed message from an astrapy InsertManyException,
to be used in raising an exception within a vectorstore multiple insertion.

This operation must filter out duplicate-id specific errors
(which the vector store could actually handle, if they were the only ondes).
"""
err_msg = "Cannot insert documents. The Data API returned the following error(s): "

filtered_error_descs = [
edesc
for edesc in err.error_descriptors
if edesc.error_code != DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE
if edesc.message
]
err_msg += "; ".join(
edesc.message or ""
for edesc in filtered_error_descs[:MAX_SHOWN_INSERTION_ERRORS]
)

if (num_residual := len(filtered_error_descs) - MAX_SHOWN_INSERTION_ERRORS) > 0:
err_msg += f". (Note: {num_residual} further errors omitted.)"

err_msg += (
" (Full API error in '<this-exception>.__cause__.error_descriptors'"
f": ignore '{DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}'.)"
)
return err_msg


class AstraDBVectorStoreError(Exception):
"""An exception during vector-store activities.

This exception represents any operational exception occurring while
performing an action within an AstraDBVectorStore.
"""


class AstraDBVectorStore(VectorStore):
"""AstraDB vector store integration.

Expand Down Expand Up @@ -955,7 +997,7 @@ def _get_missing_from_batch(
) -> tuple[list[str], list[DocDict]]:
if "status" not in insert_result:
msg = f"API Exception while running bulk insertion: {insert_result}"
raise ValueError(msg)
raise AstraDBVectorStoreError(msg)
batch_inserted = insert_result["status"]["insertedIds"]
# estimation of the preexisting documents that failed
missed_inserted_ids = {document["_id"] for document in document_batch} - set(
Expand All @@ -969,7 +1011,7 @@ def _get_missing_from_batch(
)
if num_errors != len(missed_inserted_ids) or unexpected_errors:
msg = f"API Exception while running bulk insertion: {errors}"
raise ValueError(msg)
raise AstraDBVectorStoreError(msg)
# deal with the missing insertions as upserts
missing_from_batch = [
document
Expand Down Expand Up @@ -1062,7 +1104,8 @@ def add_texts(
if document["_id"] not in inserted_ids_set
]
else:
raise
full_err_message = _insertmany_error_message(err)
raise AstraDBVectorStoreError(full_err_message) from err

# if necessary, replace docs for the non-inserted ids
if ids_to_replace:
Expand Down Expand Up @@ -1102,7 +1145,7 @@ def _replace_document(
"AstraDBVectorStore.add_texts could not insert all requested "
f"documents ({missing} failed replace_one calls)"
)
raise ValueError(msg)
raise AstraDBVectorStoreError(msg)
return inserted_ids

@override
Expand Down Expand Up @@ -1191,7 +1234,8 @@ async def aadd_texts(
if document["_id"] not in inserted_ids_set
]
else:
raise
full_err_message = _insertmany_error_message(err)
raise AstraDBVectorStoreError(full_err_message) from err

# if necessary, replace docs for the non-inserted ids
if ids_to_replace:
Expand Down Expand Up @@ -1232,7 +1276,7 @@ async def _replace_document(
"AstraDBVectorStore.add_texts could not insert all requested "
f"documents ({missing} failed replace_one calls)"
)
raise ValueError(msg)
raise AstraDBVectorStoreError(msg)
return inserted_ids

def update_metadata(
Expand Down Expand Up @@ -1919,7 +1963,7 @@ async def _asimilarity_search_with_embedding_by_sort(
sort_vector = await async_cursor.get_sort_vector()
if sort_vector is None:
msg = "Unable to retrieve the server-side embedding of the query."
raise ValueError(msg)
raise AstraDBVectorStoreError(msg)
query_embedding = sort_vector

return (
Expand Down Expand Up @@ -1959,7 +2003,7 @@ def _similarity_search_with_embedding_by_sort(
sort_vector = cursor.get_sort_vector()
if sort_vector is None:
msg = "Unable to retrieve the server-side embedding of the query."
raise ValueError(msg)
raise AstraDBVectorStoreError(msg)
query_embedding = sort_vector

return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@

import pytest
from astrapy.authentication import EmbeddingAPIKeyHeaderProvider, StaticTokenProvider
from astrapy.exceptions import InsertManyException

from langchain_astradb.utils.astradb import SetupMode
from langchain_astradb.vectorstores import AstraDBVectorStore
from langchain_astradb.vectorstores import AstraDBVectorStore, AstraDBVectorStoreError

from .conftest import (
EPHEMERAL_CUSTOM_IDX_NAME_D2,
Expand Down Expand Up @@ -510,7 +509,7 @@ def test_astradb_vectorstore_vectorize_headers_precedence_stringheader(
)
# More specific messages are provider-specific, such as OpenAI returning:
# "... Incorrect API key provided: verywrong ..."
with pytest.raises(InsertManyException, match="Embedding Provider returned"):
with pytest.raises(AstraDBVectorStoreError, match="verywrong"):
v_store.add_texts(["Failing"])

@pytest.mark.skipif(
Expand Down Expand Up @@ -538,5 +537,5 @@ def test_astradb_vectorstore_vectorize_headers_precedence_headerprovider(
)
# More specific messages are provider-specific, such as OpenAI returning:
# "... Incorrect API key provided: verywrong ..."
with pytest.raises(InsertManyException, match="Embedding Provider returned"):
with pytest.raises(AstraDBVectorStoreError, match="verywrong"):
v_store.add_texts(["Failing"])
1 change: 1 addition & 0 deletions libs/astradb/tests/unit_tests/test_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"AstraDBGraphVectorStore",
"AstraDBLoader",
"AstraDBVectorStore",
"AstraDBVectorStoreError",
"CollectionVectorServiceOptions",
]

Expand Down