Skip to content

Commit

Permalink
♻️ Refactor and finalise
Browse files Browse the repository at this point in the history
  • Loading branch information
ff137 committed Feb 12, 2025
1 parent 5d991da commit e3142c7
Showing 1 changed file with 66 additions and 77 deletions.
143 changes: 66 additions & 77 deletions acapy_agent/ledger/indy_vdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ def __init__(
socks_proxy: Optional[str] = None,
):
"""Private constructor. Use 'create_instance' to instantiate."""
LOGGER.debug(
"Initializing IndyVdrLedgerPool with name: %s, keepalive: %s, "
"cache_duration: %s, read_only: %s",
name,
keepalive,
cache_duration,
read_only,
)

# Instance attributes
self.name = name
self.keepalive = keepalive
Expand All @@ -108,17 +99,16 @@ def __init__(
self.read_only = read_only
self.socks_proxy = socks_proxy

self.ref_count = 0
self.ref_lock = asyncio.Lock()
self.close_task: Optional[asyncio.Task] = None
self.handle: Optional[Pool] = None
self.cfg_path_cache: Optional[Path] = None
self.genesis_hash_cache: Optional[str] = None
self.genesis_txns_cache = genesis_transactions
self.init_config = bool(genesis_transactions)
self.taa_cache: Optional[str] = None

LOGGER.debug("Pool %s initialization staged", name)
self._ref_count = 0
self._ref_lock = asyncio.Lock()
self._close_task: Optional[asyncio.Task] = None
self._cfg_path_cache: Optional[Path] = None
self._genesis_hash_cache: Optional[str] = None
self._genesis_txns_cache = genesis_transactions
self._init_config = bool(genesis_transactions)

@classmethod
async def get_or_create(
Expand All @@ -132,7 +122,7 @@ async def get_or_create(
read_only: bool = False,
socks_proxy: Optional[str] = None,
) -> "IndyVdrLedgerPool":
"""Asynchronously get or create the singleton instance based on configuration.
"""Asynchronously get or create a singleton instance based on configuration.
Args:
name: The pool ledger configuration name.
Expand Down Expand Up @@ -163,8 +153,6 @@ async def get_or_create(
read_only,
socks_proxy,
)
LOGGER.debug("Generated config key: %s", config_key)

async with cls._lock:
if config_key not in cls._instances:
LOGGER.debug(
Expand All @@ -181,38 +169,45 @@ async def get_or_create(
)
try:
LOGGER.debug("Initializing new IndyVdrLedgerPool instance")
await ledger_pool_instance.initialize()
await ledger_pool_instance._initialize()
except Exception as e:
LOGGER.exception(
"Initialization failed for IndyVdrLedgerPool with config: %s",
LOGGER.error(
"Initialization failed for IndyVdrLedgerPool with config: %s\n%s",
config_key,
exc_info=e,
e,
)
raise
cls._instances[config_key] = ledger_pool_instance
LOGGER.debug(
"Successfully created and stored new IndyVdrLedgerPool instance: %s",
config_key,
"Successfully created new IndyVdrLedgerPool instance with name %s",
name,
)
else:
LOGGER.debug(
"Found existing IndyVdrLedgerPool instance for config: %s",
config_key,
"Found existing IndyVdrLedgerPool instance with name %s", name
)
ledger_pool_instance = cls._instances[config_key]
await ledger_pool_instance._cancel_close_task()

return ledger_pool_instance

async def initialize(self) -> None:
async def _initialize(self) -> None:
"""Initialize the ledger pool."""
LOGGER.debug("Beginning pool initialization")
if self.init_config:
if self._init_config:
LOGGER.debug("Creating pool config with genesis transactions")
await self._create_pool_config(self.genesis_txns_cache, recreate=True)
self.init_config = False
await self._create_pool_config(self._genesis_txns_cache, recreate=True)
self._init_config = False
LOGGER.debug("Opening pool connection")
await self._open()
LOGGER.debug("Pool initialization complete")

async def _cancel_close_task(self) -> None:
"""Cancel any pending close task."""
async with self._ref_lock:
if self._close_task:
self._close_task.cancel()
self._close_task = None

@classmethod
async def release_instance(cls, instance: "IndyVdrLedgerPool") -> None:
Expand All @@ -222,6 +217,7 @@ async def release_instance(cls, instance: "IndyVdrLedgerPool") -> None:
instance: The IndyVdrLedgerPool instance to release.
"""
LOGGER.debug("Beginning instance release process for pool: %s", instance.name)

config_key = (
instance.name,
instance.keepalive,
Expand All @@ -230,59 +226,54 @@ async def release_instance(cls, instance: "IndyVdrLedgerPool") -> None:
instance.read_only,
instance.socks_proxy,
)
LOGGER.debug("Generated config key for release: %s", config_key)

async with cls._lock:
LOGGER.debug(
"Decremented reference count to %s for instance %s",
instance.ref_count,
config_key,
)
if instance.ref_count <= 0:
LOGGER.debug("Reference count is zero or negative, cleaning up instance")
if instance._ref_count <= 0:
LOGGER.debug("Reference count is empty, cleaning up instance")
await instance._close()
del cls._instances[config_key]
LOGGER.debug(
"Successfully removed IndyVdrLedgerPool instance: %s",
config_key,
"Successfully removed IndyVdrLedgerPool instance with name %s",
instance.name,
)
else:
LOGGER.debug(
"Instance still has active references: %s", instance.ref_count
"Instance %s still has active references: %s",
instance.name,
instance._ref_count,
)

@property
def cfg_path(self) -> Path:
"""Get the path to the configuration file, ensuring it's created."""
if not self.cfg_path_cache:
if not self._cfg_path_cache:
LOGGER.debug("Creating configuration path cache")
self.cfg_path_cache = storage_path("vdr", create=True)
LOGGER.debug("Configuration path set to: %s", self.cfg_path_cache)
return self.cfg_path_cache
self._cfg_path_cache = storage_path("vdr", create=True)
LOGGER.debug("Configuration path set to: %s", self._cfg_path_cache)
return self._cfg_path_cache

@property
def genesis_hash(self) -> str:
"""Get the hash of the configured genesis transactions."""
if not self.genesis_hash_cache:
if not self._genesis_hash_cache:
LOGGER.debug("Calculating genesis transactions hash")
self.genesis_hash_cache = _hash_txns(self.genesis_txns)
LOGGER.debug("Genesis hash calculated: %s", self.genesis_hash_cache)
return self.genesis_hash_cache
self._genesis_hash_cache = _hash_txns(self.genesis_txns)
LOGGER.debug("Genesis hash calculated: %s", self._genesis_hash_cache)
return self._genesis_hash_cache

@property
def genesis_txns(self) -> str:
"""Get the configured genesis transactions."""
if not self.genesis_txns_cache:
if not self._genesis_txns_cache:
LOGGER.debug("Loading genesis transactions from file")
try:
path = self.cfg_path.joinpath(self.name, "genesis")
LOGGER.debug("Reading genesis file from: %s", path)
self.genesis_txns_cache = _normalize_txns(open(path).read())
self._genesis_txns_cache = _normalize_txns(open(path).read())
LOGGER.debug("Successfully loaded genesis transactions")
except FileNotFoundError:
LOGGER.error("Pool config '%s' not found", self.name)
raise LedgerConfigError("Pool config '%s' not found", self.name) from None
return self.genesis_txns_cache
return self._genesis_txns_cache

async def _create_pool_config(
self, genesis_transactions: str, recreate: bool = False
Expand Down Expand Up @@ -330,16 +321,16 @@ async def _create_pool_config(
raise LedgerConfigError("Error writing genesis transactions") from err
LOGGER.debug("Successfully wrote pool ledger config '%s'", self.name)

self.genesis_txns_cache = genesis
self._genesis_txns_cache = genesis

async def _open(self) -> None:
"""Open the pool ledger, creating it if necessary."""
LOGGER.debug("Opening pool ledger: %s", self.name)

if self.init_config:
if self._init_config:
LOGGER.debug("Initializing pool config with genesis transactions")
await self._create_pool_config(self.genesis_txns_cache, recreate=True)
self.init_config = False
await self._create_pool_config(self._genesis_txns_cache, recreate=True)
self._init_config = False

genesis_hash = self.genesis_hash
LOGGER.debug("Using genesis hash: %s", genesis_hash)
Expand Down Expand Up @@ -372,16 +363,16 @@ async def _open(self) -> None:

async def context_open(self) -> None:
"""Open the ledger if necessary and increase the number of active references."""
async with self.ref_lock:
if self.close_task:
self.close_task.cancel()
await self._cancel_close_task()
async with self._ref_lock:
if not self.handle:
LOGGER.debug("Opening the pool ledger")
await self._open()
self.ref_count += 1
self._ref_count += 1
self.pending_use_count = max(0, self.pending_use_count - 1) # Clear pending
LOGGER.debug(
"In context_open: Incremented reference count to %s for instance %s",
self.ref_count,
"Incremented reference count to %s for instance %s",
self._ref_count,
self.name,
)

Expand Down Expand Up @@ -413,13 +404,13 @@ async def _close(self) -> None:
LOGGER.exception(
"Failed to close pool ledger after 3 attempts", exc_info=exc
)
self.ref_count += 1 # if we are here, we should have self.ref_lock
self._ref_count += 1 # if we are here, we should have self.ref_lock
LOGGER.debug(
"Re-incremented reference count to %s for instance %s",
self.ref_count,
self._ref_count,
self.name,
)
self.close_task = None
self._close_task = None
raise LedgerError("Exception when closing pool ledger") from exc

async def context_close(self) -> None:
Expand All @@ -442,16 +433,18 @@ async def _keepalive_closer(timeout: int) -> None:
e,
)

async with self.ref_lock:
self.ref_count -= 1
LOGGER.debug("Decremented ref_count to %d.", self.ref_count)
if not self.ref_count:
async with self._ref_lock:
self._ref_count -= 1
LOGGER.debug(
"Decremented ref_count to %d for instance %s", self._ref_count, self.name
)
if self._ref_count <= 0:
if self.keepalive:
LOGGER.debug(
"Scheduling closer coroutine with keepalive=%s",
self.keepalive,
)
self.close_task = asyncio.create_task(
self._close_task = asyncio.create_task(
_keepalive_closer(self.keepalive)
)
else:
Expand Down Expand Up @@ -512,16 +505,13 @@ async def __aenter__(self) -> "IndyVdrLedger":
Returns:
The current instance
"""
LOGGER.debug("Entering IndyVdrLedger context manager")
await super().__aenter__()
await self.pool.context_open()
return self

async def __aexit__(self, exc_type, exc, tb):
"""Context manager exit."""
LOGGER.debug("Exiting IndyVdrLedger context manager")
await self.pool.context_close()
await super().__aexit__(exc_type, exc, tb)

Expand All @@ -541,7 +531,6 @@ async def _submit(
taa_accept: whether to apply TAA acceptance to the (signed, write) request
sign_did: override the signing DID
write_ledger: whether to write the request to the ledger
"""

if not self.pool_handle:
Expand Down

0 comments on commit e3142c7

Please sign in to comment.