diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py index 157e71fe49..5fe96d2ef5 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py @@ -1,5 +1,8 @@ +import atexit import json import os +import signal +import threading import uuid from datetime import timedelta from typing import Dict, List, Optional @@ -29,10 +32,31 @@ class CouchbaseColumnarDataSourceCreator(DataSourceCreator): - collections: List[str] = [] + _shutting_down = False + _cluster = None + _cluster_lock = threading.Lock() + + @classmethod + def get_cluster(cls): + with cls._cluster_lock: + if cls._cluster is None: + cred = Credential.from_username_and_password( + os.environ["COUCHBASE_COLUMNAR_USER"], + os.environ["COUCHBASE_COLUMNAR_PASSWORD"], + ) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + cls._cluster = Cluster.create_instance( + os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"], + cred, + ClusterOptions(timeout_options=timeout_opts), + ) + return cls._cluster - def __init__(self, project_name: str, *args, **kwargs): + def __init__(self, project_name: str, **kwargs): super().__init__(project_name) + self.project_name = project_name + self.collections: List[str] = [] + self.offline_store_config = CouchbaseColumnarOfflineStoreConfig( type="couchbase", connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"], @@ -64,18 +88,8 @@ def format_row(row): collection_name = self.get_prefixed_collection_name(destination_name) - cred = Credential.from_username_and_password( - self.offline_store_config.user, self.offline_store_config.password - ) - timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) - cluster = Cluster.create_instance( - self.offline_store_config.connection_string, - cred, - ClusterOptions(timeout_options=timeout_opts), - ) - create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;" - cluster.execute_query( + self.get_cluster().execute_query( create_cluster_query, QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)), ) @@ -88,7 +102,7 @@ def format_row(row): {values_clause} ]) """ - cluster.execute_query( + self.get_cluster().execute_query( insert_query, QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)), ) @@ -126,22 +140,52 @@ def create_offline_store_config(self) -> FeastConfigBaseModel: def get_prefixed_collection_name(self, suffix: str) -> str: return f"{self.project_name}_{suffix}" - def teardown(self): - cred = Credential.from_username_and_password( - self.offline_store_config.user, self.offline_store_config.password - ) - - timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) - cluster = Cluster.create_instance( - self.offline_store_config.connection_string, - cred, - ClusterOptions(timeout_options=timeout_opts), - ) + @classmethod + def get_dangling_collections(cls) -> List[str]: + query = """ + SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName + FROM System.Metadata.`Dataset` d + WHERE d.DataverseName <> "Metadata" + AND (REGEXP_CONTAINS(d.DatasetName, "integration_test_.*") + OR REGEXP_CONTAINS(d.DatasetName, "feast_entity_df_.*")); + """ + try: + res = cls.get_cluster().execute_query(query) + return res.get_all_rows() + except Exception as e: + print(f"Error fetching collections: {e}") + return [] + + @classmethod + def cleanup_all(cls): + if cls._shutting_down: + return + cls._shutting_down = True + try: + collections = cls.get_dangling_collections() + if len(collections) == 0: + print("No collections to clean up.") + return + + print(f"Found {len(collections)} collections to clean up.") + if len(collections) > 5: + print("This may take a few minutes...") + for collection in collections: + try: + query = f"DROP COLLECTION {collection} IF EXISTS;" + cls.get_cluster().execute_query(query) + print(f"Dropped collection: {collection}") + except Exception as e: + print(f"Error dropping collection {collection}: {e}") + finally: + print("Cleanup complete.") + cls._shutting_down = False + def teardown(self): for collection in self.collections: query = f"DROP COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection}` IF EXISTS;" try: - cluster.execute_query( + self.get_cluster().execute_query( query, QueryOptions( timeout=timedelta(seconds=self.offline_store_config.timeout) @@ -150,3 +194,20 @@ def teardown(self): print(f"Successfully dropped collection: {collection}") except Exception as e: print(f"Error dropping collection {collection}: {e}") + + +def cleanup_handler(signum, frame): + print("\nCleaning up dangling resources...") + try: + CouchbaseColumnarDataSourceCreator.cleanup_all() + except Exception as e: + print(f"Error during cleanup: {e}") + finally: + # Re-raise the signal to properly exit + signal.default_int_handler(signum, frame) + + +# Register both SIGINT and SIGTERM handlers +signal.signal(signal.SIGINT, cleanup_handler) +signal.signal(signal.SIGTERM, cleanup_handler) +atexit.register(CouchbaseColumnarDataSourceCreator.cleanup_all)