Skip to content

Commit

Permalink
Cleanup Test Resources
Browse files Browse the repository at this point in the history
Signed-off-by: Elliot Scribner <[email protected]>
  • Loading branch information
ejscribner committed Feb 7, 2025
1 parent 3b7e870 commit 64f2616
Showing 1 changed file with 87 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import atexit
import json
import os
import signal
import threading
import uuid
from datetime import timedelta
from typing import Dict, List, Optional
Expand Down Expand Up @@ -29,10 +32,31 @@


class CouchbaseColumnarDataSourceCreator(DataSourceCreator):
collections: List[str] = []
_shutting_down = False
_cluster = None
_cluster_lock = threading.Lock()

@classmethod
def get_cluster(cls):
with cls._cluster_lock:
if cls._cluster is None:
cred = Credential.from_username_and_password(
os.environ["COUCHBASE_COLUMNAR_USER"],
os.environ["COUCHBASE_COLUMNAR_PASSWORD"],
)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cls._cluster = Cluster.create_instance(
os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
cred,
ClusterOptions(timeout_options=timeout_opts),
)
return cls._cluster

def __init__(self, project_name: str, *args, **kwargs):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.project_name = project_name
self.collections: List[str] = []

self.offline_store_config = CouchbaseColumnarOfflineStoreConfig(
type="couchbase",
connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
Expand Down Expand Up @@ -64,18 +88,8 @@ def format_row(row):

collection_name = self.get_prefixed_collection_name(destination_name)

cred = Credential.from_username_and_password(
self.offline_store_config.user, self.offline_store_config.password
)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
self.offline_store_config.connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)

create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;"
cluster.execute_query(
self.get_cluster().execute_query(
create_cluster_query,
QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
)
Expand All @@ -88,7 +102,7 @@ def format_row(row):
{values_clause}
])
"""
cluster.execute_query(
self.get_cluster().execute_query(
insert_query,
QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
)
Expand Down Expand Up @@ -126,22 +140,52 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
def get_prefixed_collection_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"

def teardown(self):
cred = Credential.from_username_and_password(
self.offline_store_config.user, self.offline_store_config.password
)

timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
self.offline_store_config.connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)
@classmethod
def get_dangling_collections(cls) -> List[str]:
query = """
SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName
FROM System.Metadata.`Dataset` d
WHERE d.DataverseName <> "Metadata"
AND (REGEXP_CONTAINS(d.DatasetName, "integration_test_.*")
OR REGEXP_CONTAINS(d.DatasetName, "feast_entity_df_.*"));
"""
try:
res = cls.get_cluster().execute_query(query)
return res.get_all_rows()
except Exception as e:
print(f"Error fetching collections: {e}")
return []

@classmethod
def cleanup_all(cls):
if cls._shutting_down:
return
cls._shutting_down = True
try:
collections = cls.get_dangling_collections()
if len(collections) == 0:
print("No collections to clean up.")
return

print(f"Found {len(collections)} collections to clean up.")
if len(collections) > 5:
print("This may take a few minutes...")
for collection in collections:
try:
query = f"DROP COLLECTION {collection} IF EXISTS;"
cls.get_cluster().execute_query(query)
print(f"Dropped collection: {collection}")
except Exception as e:
print(f"Error dropping collection {collection}: {e}")
finally:
print("Cleanup complete.")
cls._shutting_down = False

def teardown(self):
for collection in self.collections:
query = f"DROP COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection}` IF EXISTS;"
try:
cluster.execute_query(
self.get_cluster().execute_query(
query,
QueryOptions(
timeout=timedelta(seconds=self.offline_store_config.timeout)
Expand All @@ -150,3 +194,20 @@ def teardown(self):
print(f"Successfully dropped collection: {collection}")
except Exception as e:
print(f"Error dropping collection {collection}: {e}")


def cleanup_handler(signum, frame):
print("\nCleaning up dangling resources...")
try:
CouchbaseColumnarDataSourceCreator.cleanup_all()
except Exception as e:
print(f"Error during cleanup: {e}")
finally:
# Re-raise the signal to properly exit
signal.default_int_handler(signum, frame)


# Register both SIGINT and SIGTERM handlers
signal.signal(signal.SIGINT, cleanup_handler)
signal.signal(signal.SIGTERM, cleanup_handler)
atexit.register(CouchbaseColumnarDataSourceCreator.cleanup_all)

0 comments on commit 64f2616

Please sign in to comment.