Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pgvecto.rs #748

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

feat: support pgvecto.rs #748

wants to merge 2 commits into from

Conversation

wwulfric
Copy link

@wwulfric wwulfric commented Jan 9, 2025

feat: support pgvecto.rs
fix: pgvector has duplicated problem #739

fix: pgvector has duplicated problem
Copy link

@llamapreview llamapreview bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto Pull Request Review from LlamaPReview

1. Overview

1.1 PR Summary

  • Business value and requirements alignment: This PR introduces support for pgvecto.rs, a new vector store implementation, alongside the existing pgvector. It addresses a duplication issue in pgvector (PGVector Duplicates Entries #739).
  • Key components modified:
    • src/vanna/pgvector/__init__.py
    • src/vanna/pgvector/pgvecto_rs.py
    • src/vanna/pgvector/pgvector.py
  • Impact assessment: The introduction of pgvecto.rs adds architectural complexity and requires careful management of data consistency and migration strategies. The fix for the duplication issue in pgvector is critical for maintaining system stability.
  • System dependencies and integration impacts: The changes impact how the application interacts with vector stores, requiring robust error handling and connection management.

1.2 Architecture Changes

  • System design modifications: The new PG_Vecto_rsStore class is added to support pgvecto.rs, introducing a new vector store implementation.
  • Component interactions: The application logic now needs to choose between PG_VectorStore and PG_Vecto_rsStore. The __init__.py modification suggests both will be available.
  • Integration points: The PG_Vecto_rsStore class interacts with the database using SQLAlchemy and the langchain_community.vectorstores.pgvecto_rs.PGVecto_rs library.

2. Detailed Technical Analysis

2.1 Code Logic Deep-Dive

Core Logic Changes

  • src/vanna/pgvector/pgvecto_rs.py - deterministic_uuid (indirectly used in add_question_sql, add_ddl, add_documentation)
    • Submitted PR Code:
    import ast
    import json
    import logging
    import uuid

    import pandas as pd
    from langchain_core.documents import Document
    from langchain_community.vectorstores.pgvecto_rs import PGVecto_rs
    from sqlalchemy import create_engine, text

    from .. import ValidationError
    from ..base import VannaBase
    from ..types import TrainingPlan, TrainingPlanItem
    from ..utils import deterministic_uuid

    class PG_Vecto_rsStore(VannaBase):
        # ... rest of the class
        def add_question_sql(self, question: str, sql: str, **kwargs) -> str:
            question_sql_json = json.dumps(
                {
                    "question": question,
                    "sql": sql,
                },
                ensure_ascii=False,
            )
            id = deterministic_uuid(question_sql_json) + "-sql"
            createdat = kwargs.get("createdat")
            doc = Document(
                page_content=question_sql_json,
                metadata={"id": id, "createdat": createdat},
            )
            self.sql_collection.add_documents([doc], ids=[doc.metadata["id"]])

            return id

        def add_ddl(self, ddl: str, **kwargs) -> str:
            _id = deterministic_uuid(ddl) + "-ddl"
            doc = Document(
                page_content=ddl,
                metadata={"id": _id},
            )
            self.ddl_collection.add_documents([doc], ids=[doc.metadata["id"]])
            return _id

        def add_documentation(self, documentation: str, **kwargs) -> str:
            _id = deterministic_uuid(documentation) + "-doc"
            doc = Document(
                page_content=documentation,
                metadata={"id": _id},
            )
            self.documentation_collection.add_documents([doc],
                                                        ids=[doc.metadata["id"]])
            return _id
        # ... rest of the class
  • Analysis:
    • The deterministic_uuid function is used to generate IDs for the vector store entries. While this aims to prevent duplicates, the potential for collisions exists if the hashing algorithm is not robust or if input strings have common prefixes.
    • The suffixes "-sql", "-ddl", and "-doc" help identify the type but do not fundamentally prevent UUID collisions if the base deterministic_uuid collides.
  • LlamaPReview Suggested Improvements:
    import ast
    import json
    import logging
    import uuid

    import pandas as pd
    from langchain_core.documents import Document
    from langchain_community.vectorstores.pgvecto_rs import PGVecto_rs
    from sqlalchemy import create_engine, text

    from .. import ValidationError
    from ..base import VannaBase
    from ..types import TrainingPlan, TrainingPlanItem
    from ..utils import deterministic_uuid, generate_safe_id

    class PG_Vecto_rsStore(VannaBase):
        # ... rest of the class
        def add_question_sql(self, question: str, sql: str, **kwargs) -> str:
            question_sql_json = json.dumps(
                {
                    "question": question,
                    "sql": sql,
                },
                ensure_ascii=False,
            )
            id = generate_safe_id(deterministic_uuid(question_sql_json), "sql")
            createdat = kwargs.get("createdat")
            doc = Document(
                page_content=question_sql_json,
                metadata={"id": id, "createdat": createdat},
            )
            self.sql_collection.add_documents([doc], ids=[doc.metadata["id"]])

            return id

        def add_ddl(self, ddl: str, **kwargs) -> str:
            _id = generate_safe_id(deterministic_uuid(ddl), "ddl")
            doc = Document(
                page_content=ddl,
                metadata={"id": _id},
            )
            self.ddl_collection.add_documents([doc], ids=[doc.metadata["id"]])
            return _id

        def add_documentation(self, documentation: str, **kwargs) -> str:
            _id = generate_safe_id(deterministic_uuid(documentation), "doc")
            doc = Document(
                page_content=documentation,
                metadata={"id": _id},
            )
            self.documentation_collection.add_documents([doc],
                                                        ids=[doc.metadata["id"]])
            return _id
        # ... rest of the class
  • Improvement rationale:
    • Technical benefits: Introduces a generate_safe_id function to make ID generation more explicit and reduce the risk of collision if the base UUID generation has weaknesses.
    • Risk assessment: Mitigates the risk of data inconsistency due to ID collisions.

Core Logic Changes

  • src/vanna/pgvector/pgvecto_rs.py - get_similar_question_sql
    • Submitted PR Code:
    class PG_Vecto_rsStore(VannaBase):
        # ...
        def get_similar_question_sql(self, question: str, **kwargs) -> list:
            documents = self.sql_collection.similarity_search(query=question,
                                                               k=self.n_results)
            return [ast.literal_eval(document.page_content) for document in documents]
        # ...
  • Analysis:
    • The code uses ast.literal_eval to parse the page_content of the retrieved documents. While convenient, ast.literal_eval is only safe for evaluating strings containing Python literals. If the data in page_content is malformed or contains non-literal Python code (even unintentionally), ast.literal_eval can raise exceptions, halting the process. This is a potential point of failure not addressed in the initial review.
    • Error handling is missing around the ast.literal_eval call.
  • LlamaPReview Suggested Improvements:
    import json
    class PG_Vecto_rsStore(VannaBase):
        # ...
        def get_similar_question_sql(self, question: str, **kwargs) -> list:
            documents = self.sql_collection.similarity_search(query=question,
                                                               k=self.n_results)
            results = []
            for document in documents:
                try:
                    results.append(json.loads(document.page_content))
                except json.JSONDecodeError:
                    logging.warning(f"Could not decode JSON from document: {document.page_content}")
                    continue # or handle the error as appropriate
            return results
        # ...
  • Improvement rationale:
    • Technical benefits: Replaces ast.literal_eval with json.loads, which is more appropriate given that the add_question_sql method uses json.dumps. It also adds error handling to gracefully manage cases where the page_content is not valid JSON.
    • Risk assessment: Improves the robustness of the data retrieval process by handling potential data corruption or unexpected formats.

Core Logic Changes

  • src/vanna/pgvector/pgvecto_rs.py - get_training_data
    • Submitted PR Code:
    class PG_Vecto_rsStore(VannaBase):
        # ...
        def get_training_data(self, **kwargs) -> pd.DataFrame:
            # Establishing the connection
            engine = create_engine(self.connection_string)

            # Querying the 'langchain_pg_embedding' table
            query_embedding = "SELECT cmetadata, document FROM langchain_pg_embedding"
            df_embedding = pd.read_sql(query_embedding, engine)

            # List to accumulate the processed rows
            processed_rows = []

            # Process each row in the DataFrame
            for _, row in df_embedding.iterrows():
                custom_id = row["cmetadata"]["id"]
                document = row["document"]
                training_data_type = "documentation" if custom_id[
                                                        -3:] == "doc" else custom_id[-3:]

                if training_data_type == "sql":
                    # Convert the document string to a dictionary
                    try:
                        doc_dict = ast.literal_eval(document)
                        question = doc_dict.get("question")
                        content = doc_dict.get("sql")
                    except (ValueError, SyntaxError):
                        logging.info(
                            f"Skipping row with custom_id {custom_id} due to parsing error.")
                        continue
                elif training_data_type in ["documentation", "ddl"]:
                    question = None  # Default value for question
                    content = document
                else:
                    # If the suffix is not recognized, skip this row
                    logging.info(
                        f"Skipping row with custom_id {custom_id} due to unrecognized training data type.")
                    continue

                # Append the processed data to the list
                processed_rows.append(
                    {"id": custom_id, "question": question, "content": content,
                     "training_data_type": training_data_type}
                )

            # Create a DataFrame from the list of processed rows
            df_processed = pd.DataFrame(processed_rows)

            return df_processed
        # ...
  • Analysis:
    • The get_training_data method infers the training data type based on the last three characters of the custom_id. This is a fragile approach. If the ID generation logic changes or if there are inconsistencies in the suffixes, the type detection will fail. This logic is duplicated from the remove_collection method, indicating a lack of a central type identification mechanism.
    • The method relies on querying the underlying langchain_pg_embedding table directly, bypassing the abstraction provided by the PGVecto_rs class. This creates a tight coupling with the underlying storage schema and could break if the schema changes.
  • LlamaPReview Suggested Improvements:
    import json
    class PG_Vecto_rsStore(VannaBase):
        # ...
        def get_training_data(self, **kwargs) -> pd.DataFrame:
            processed_rows = []
            for collection_name in ["sql", "ddl", "documentation"]:
                collection = self.get_collection(collection_name)
                documents = collection.get() # Assuming a get() method exists in PGVecto_rs or use a different retrieval method
                for doc in documents:
                    training_data_type = collection_name
                    if training_data_type == "sql":
                        try:
                            doc_dict = json.loads(doc.page_content)
                            question = doc_dict.get("question")
                            content = doc_dict.get("sql")
                        except json.JSONDecodeError:
                            logging.warning(f"Could not decode JSON from document: {doc.page_content}")
                            continue
                    else:
                        question = None
                        content = doc.page_content
                    processed_rows.append({
                        "id": doc.metadata.get("id"),
                        "question": question,
                        "content": content,
                        "training_data_type": training_data_type
                    })
            df_processed = pd.DataFrame(processed_rows)
            return df_processed
        # ...
  • Improvement rationale:
    • Technical benefits: Retrieves data using the collection-specific methods, adhering to the class's abstraction and avoiding direct database queries. This makes the code more maintainable and less prone to breaking due to schema changes. It also centralizes the logic for accessing data for each type.
    • Business value: Provides a more reliable way to retrieve training data, reducing the risk of errors due to fragile type detection.

Core Logic Changes

  • src/vanna/pgvector/pgvecto_rs.py - remove_collection
    • Submitted PR Code:
    from sqlalchemy import create_engine, text

    class PG_Vecto_rsStore(VannaBase):
        # ...
        def remove_collection(self, collection_name: str) -> bool:
            engine = create_engine(self.connection_string)

            # Determine the suffix to look for based on the collection name
            suffix_map = {"ddl": "ddl", "sql": "sql", "documentation": "doc"}
            suffix = suffix_map.get(collection_name)

            if not suffix:
                logging.info(
                    "Invalid collection name. Choose from 'ddl', 'sql', or 'documentation'.")
                return False

            # SQL query to delete rows based on the condition
            query = text(
                f"""
                DELETE FROM langchain_pg_embedding
                WHERE cmetadata->>'id' LIKE '%{suffix}'
            """
            )

            # Execute the deletion within a transaction block
            with engine.connect() as connection:
                with connection.begin() as transaction:
                    try:
                        result = connection.execute(query)
                        transaction.commit()  # Explicitly commit the transaction
                        if result.rowcount() > 0:
                            logging.info(
                                f"Deleted {result.rowcount()} rows from "
                                f"langchain_pg_embedding where collection is {collection_name}."
                            )
                            return True
                        else:
                            logging.info(f"No rows deleted for collection {collection_name}.")
                            return False
                    except Exception as e:
                        logging.error(f"An error occurred: {e}")
                        transaction.rollback()  # Rollback in case of error
                        return False
  • Analysis:
    • The remove_collection method in PG_Vecto_rsStore directly constructs and executes a SQL DELETE statement against the langchain_pg_embedding table based on a suffix. This approach tightly couples the code to the underlying database schema and bypasses any abstraction provided by the langchain_community.vectorstores.pgvecto_rs.PGVecto_rs class. The initial review doesn't highlight this direct SQL manipulation.
    • There's a potential inconsistency with how the PGVecto_rs library is intended to be used. It likely provides its own methods for managing collections or deleting data, which are not being utilized here. This makes maintenance harder and could lead to unexpected behavior if the underlying library's schema or data management strategies change.
  • LlamaPReview Suggested Improvements:
    class PG_Vecto_rsStore(VannaBase):
        # ...
        def remove_collection(self, collection_name: str) -> bool:
            try:
                if collection_name == "sql":
                    self.sql_collection.delete_collection()
                elif collection_name == "ddl":
                    self.ddl_collection.delete_collection()
                elif collection_name == "documentation":
                    self.documentation_collection.delete_collection()
                else:
                    logging.info("Invalid collection name.")
                    return False
                logging.info(f"Collection '{collection_name}' removed successfully.")
                return True
            except Exception as e:
                logging.error(f"Error removing collection '{collection_name}': {e}")
                return False
  • Improvement rationale:
    • Technical benefits: Leverages the delete_collection() method (or equivalent, assuming it exists in the PGVecto_rs library) provided by the vector store class. This aligns with the intended usage of the library, improves abstraction, and reduces direct SQL manipulation.
    • Maintenance and evolution considerations: Makes the code more resilient to changes in the underlying database schema or the PGVecto_rs library's implementation.
    • Risk assessment: Reduces the risk of errors associated with manual SQL construction and execution.

2.2 Implementation Quality

  • Code organization and structure: The new PG_Vecto_rsStore class is well-organized, with clear methods for adding, retrieving, and removing data. The use of deterministic UUIDs for ID generation is a good practice to prevent duplicates.
  • Design patterns usage: The class follows the base class pattern, extending VannaBase and leveraging the PGVecto_rs library for vector store operations.
  • Error handling approach: The error handling in the remove_collection method is robust, with transaction management and logging. However, other methods like get_similar_question_sql lack error handling around data parsing.
  • Resource management: The use of SQLAlchemy for database connections ensures proper resource management, with connections being closed after use.

3. Critical Findings

3.1 Potential Issues

  • 🔴 Critical Issues

    • Issue description: The remove_collection method directly constructs and executes a SQL DELETE statement, tightly coupling the code to the underlying database schema and bypassing the abstraction provided by the PGVecto_rs library.
    • Impact: This approach makes maintenance harder and could lead to unexpected behavior if the underlying library's schema or data management strategies change.
    • Recommendation: Leverage the delete_collection() method (or equivalent) provided by the vector store class to align with the intended usage of the library, improve abstraction, and reduce direct SQL manipulation.
  • 🟡 Warnings

    • Warning description: The get_similar_question_sql method uses ast.literal_eval to parse the page_content of the retrieved documents, which is only safe for evaluating strings containing Python literals. If the data in page_content is malformed or contains non-literal Python code, ast.literal_eval can raise exceptions, halting the process.
    • Potential risks: This is a potential point of failure not addressed in the initial review.
    • Suggested improvements: Replace ast.literal_eval with json.loads, which is more appropriate given that the add_question_sql method uses json.dumps. Add error handling to gracefully manage cases where the page_content is not valid JSON.

3.2 Code Quality Concerns

  • Maintainability aspects: The direct SQL manipulation in the remove_collection method reduces maintainability. The lack of error handling around data parsing in methods like get_similar_question_sql could lead to unexpected failures.
  • Readability issues: The code is generally readable, but the direct SQL queries in the remove_collection method could be confusing for future developers.
  • Performance bottlenecks: The get_training_data method queries the entire langchain_pg_embedding table and processes each row, which could be a performance bottleneck for large datasets.

4. Security Assessment

  • Authentication/Authorization impacts: Ensure that the database connection strings are securely handled and not exposed in logs or error messages.
  • Data handling concerns: The use of ast.literal_eval in get_similar_question_sql poses a risk if the data is not strictly controlled. Replace it with json.loads for safer data handling.
  • Input validation: Ensure that all inputs, especially those used in SQL queries, are properly validated to prevent SQL injection attacks.
  • Security best practices: Follow best practices for secure coding, including proper error handling, input validation, and secure management of sensitive information like database connection strings.
  • Potential security risks: Direct SQL manipulation in the remove_collection method poses a risk if not properly validated.
  • Mitigation strategies: Use the abstractions provided by the PGVecto_rs library to avoid direct SQL manipulation.
  • Security testing requirements: Conduct thorough security testing, including penetration testing, to identify and mitigate potential security risks.

5. Testing Strategy

5.1 Test Coverage

  • Unit test analysis: Ensure comprehensive unit tests for the new PG_Vecto_rsStore class, covering all methods and edge cases.
  • Integration test requirements: Conduct integration tests to verify the interaction between the PG_Vecto_rsStore class and the underlying database, as well as the PGVecto_rs library.
  • Edge cases coverage: Test edge cases, including invalid inputs, empty datasets, and large datasets, to ensure the robustness of the implementation.

5.2 Test Recommendations

Suggested Test Cases

  # Example unit test for add_question_sql
  def test_add_question_sql(self):
      store = PG_Vecto_rsStore(config={"connection_string": "test_connection_string"})
      question = "What is the capital of France?"
      sql = "SELECT capital FROM countries WHERE name = 'France';"
      id = store.add_question_sql(question, sql)
      self.assertIsNotNone(id)
      self.assertTrue(id.endswith("-sql"))

  # Example unit test for get_similar_question_sql
  def test_get_similar_question_sql(self):
      store = PG_Vecto_rsStore(config={"connection_string": "test_connection_string"})
      question = "What is the capital of France?"
      similar_questions = store.get_similar_question_sql(question)
      self.assertIsInstance(similar_questions, list)
      for item in similar_questions:
          self.assertIn("question", item)
          self.assertIn("sql", item)
  • Coverage improvements: Ensure that all methods in the PG_Vecto_rsStore class are covered by unit tests, including edge cases and error conditions.
  • Performance testing needs: Conduct performance testing to evaluate the efficiency of methods like get_training_data, especially with large datasets.

6. Documentation & Maintenance

  • Documentation updates needed: Update the documentation to include the new PG_Vecto_rsStore class and its methods. Provide clear guidelines on how to configure and use the new vector store implementation.
  • Long-term maintenance considerations: Ensure that the PG_Vecto_rsStore class is maintainable by avoiding direct SQL manipulation and leveraging the abstractions provided by the PGVecto_rs library.
  • Technical debt and monitoring requirements: Monitor the performance and stability of the new vector store implementation. Address any technical debt introduced by the direct SQL manipulation in the remove_collection method.

7. Deployment & Operations

  • Deployment impact and strategy: The introduction of the new vector store implementation requires careful deployment planning. Ensure that the database schema is compatible with the new implementation and that any necessary migrations are performed.
  • Key operational considerations: Monitor the performance and stability of the new vector store implementation in the production environment. Ensure that error handling and logging are robust to facilitate troubleshooting.

8. Summary & Recommendations

8.1 Key Action Items

  1. Critical changes required:
    • Leverage the delete_collection() method (or equivalent) provided by the vector store class in the remove_collection method to avoid direct SQL manipulation.
    • Replace ast.literal_eval with json.loads in the get_similar_question_sql method and add error handling.
  2. Important improvements suggested:
    • Ensure comprehensive unit tests for the new PG_Vecto_rsStore class, covering all methods and edge cases.
    • Conduct integration tests to verify the interaction between the PG_Vecto_rsStore class and the underlying database, as well as the PGVecto_rs library.
  3. Best practices to implement:
    • Follow best practices for secure coding, including proper error handling, input validation, and secure management of sensitive information like database connection strings.
    • Avoid direct SQL manipulation and leverage the abstractions provided by the PGVecto_rs library.
  4. Cross-cutting concerns to address:
    • Ensure that the get_training_data method retrieves data using the collection-specific methods, adhering to the class's abstraction and avoiding direct database queries.

8.2 Future Considerations

  • Technical evolution path: Continuously monitor and improve the new vector store implementation based on feedback and performance metrics.
  • Business capability evolution: Evaluate the need for additional vector store implementations or features based on business requirements.
  • System integration impacts: Ensure that the new vector store implementation integrates seamlessly with other components of the system.

💡 Help Shape LlamaPReview
How's this review format working for you? Vote in our Github Discussion Polls to help us improve your review experience!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant