diff --git a/nanolab/loggers/builders.py b/nanolab/loggers/builders.py index 7f57ee2..57f0ca7 100644 --- a/nanolab/loggers/builders.py +++ b/nanolab/loggers/builders.py @@ -12,7 +12,7 @@ def set_timeout(self, timeout): return self @abstractmethod - def build(self): + async def build(self): pass @@ -36,11 +36,13 @@ def set_node_name(self, node_name): self.node_name = node_name return self - def build(self): - if self.rpc_url is None or self.expected_blocks_count is None or self.node_name is None: + async def build(self): + if any(param is None for param in [self.rpc_url, self.expected_blocks_count, self.node_name, self.timeout]): raise Exception("Missing required parameters") - return RPCLogger(self.node_name, self.rpc_url, - self.expected_blocks_count, self.timeout) + logger = RPCLogger(self.node_name, self.rpc_url, + self.expected_blocks_count, self.timeout) + await logger.async_init() + return logger # class WebSocketLoggerBuilder(LoggerBuilder): diff --git a/nanolab/loggers/factories/logger_factory.py b/nanolab/loggers/factories/logger_factory.py index 628da2f..a60e937 100644 --- a/nanolab/loggers/factories/logger_factory.py +++ b/nanolab/loggers/factories/logger_factory.py @@ -5,7 +5,7 @@ class LoggerFactory: @staticmethod - def create_logger(logger_type: str, config: Dict[str, Any]): + async def create_logger(logger_type: str, config: Dict[str, Any]): if logger_type == "rpc": builder = RPCLoggerBuilder() elif logger_type == "websocket": @@ -18,4 +18,4 @@ def create_logger(logger_type: str, config: Dict[str, Any]): if hasattr(builder, f'set_{key}'): getattr(builder, f'set_{key}')(value) - return builder.build() + return await builder.build() diff --git a/nanolab/loggers/logger_manager.py b/nanolab/loggers/logger_manager.py index 1013ace..7ca7183 100644 --- a/nanolab/loggers/logger_manager.py +++ b/nanolab/loggers/logger_manager.py @@ -19,8 +19,8 @@ def __init__(self, logger_params, sink_params: List[Dict]): self.sink_params_list = sink_params async def create_logger_and_storages(self, current_logger_params): - logger = LoggerFactory.create_logger(self.logger_type, - current_logger_params) + logger = await LoggerFactory.create_logger(self.logger_type, + current_logger_params) storages = [] for sink_params in self.sink_params_list: @@ -33,7 +33,7 @@ async def create_logger_and_storages(self, current_logger_params): async def create_loggers(self): for node in self.nodes_config: if (self.included_peers and node["name"] not in self.included_peers - ) or node["name"] in self.excluded_peers: + ) or node["name"] in self.excluded_peers: continue current_params = dict(self.logger_params) current_params["node_name"] = node["name"] diff --git a/nanolab/loggers/sources/rpc_logger.py b/nanolab/loggers/sources/rpc_logger.py index afdf7d5..d99a6c5 100644 --- a/nanolab/loggers/sources/rpc_logger.py +++ b/nanolab/loggers/sources/rpc_logger.py @@ -4,39 +4,40 @@ from datetime import datetime import asyncio from typing import AsyncIterator +import logging class RPCLogger(ILogger): """A logger that fetches logs from an RPC endpoint.""" - FETCH_INTERVAL_DELAY = 0.1 # seconds - def __init__(self, - node_name: str, - rpc_url: str, - expected_blocks_count: int, - timeout: int, - count_start=None, - cemented_start=None): + def __init__(self, node_name: str, rpc_url: str, expected_blocks_count: int, timeout: int): self.node_name = node_name self.rpc_url = rpc_url self.expected_blocks_count = int(expected_blocks_count) self.timeout = int(timeout) self.nanorpc = NanoRpc(self.rpc_url) + # These will be initialized in async_init + self.count_start = None + self.cemented_start = None + self.node_version = None + self.end_block_count = None + self.previous_count = 0 + self.previous_cemented = 0 + self.previous_elapsed_time = 0 + + async def async_init(self, count_start=None, cemented_start=None): if count_start is None or cemented_start is None: - self.count_start, self.cemented_start = self._get_block_count() + self.count_start, self.cemented_start = await self._get_block_count() else: self.count_start = count_start self.cemented_start = cemented_start - node_version = self.nanorpc.version() + node_version = await self.nanorpc.version() self.node_version = f'{node_version["node_vendor"]} {node_version["build_info"][0:7]}' if node_version else "???" self.end_block_count = self.count_start + self.expected_blocks_count - self.previous_count = 0 - self.previous_cemented = 0 - self.previous_elapsed_time = 0 - def _get_block_count(self): - block_count = self.nanorpc.block_count() + async def _get_block_count(self): + block_count = await self.nanorpc.block_count() return int(block_count["count"]), int(block_count["cemented"]) def is_fully_synced(self, cemented): @@ -49,7 +50,7 @@ async def fetch_logs(self) -> AsyncIterator[LogData]: start_time = time.time() timeout_start = time.time() while True: - count, cemented = self._get_block_count() + count, cemented = await self._get_block_count() is_synced = self.is_fully_synced(cemented) percent_cemented = ((cemented - self.cemented_start) / self.expected_blocks_count) * 100 diff --git a/nanolab/publisher/block_asserts.py b/nanolab/publisher/block_asserts.py index 7b1ba46..6a383f3 100644 --- a/nanolab/publisher/block_asserts.py +++ b/nanolab/publisher/block_asserts.py @@ -73,9 +73,9 @@ async def assert_blocks_published(self, blocks): assert all(info['exists'] for info in block_infos), f"Some blocks were not published" - def assert_all_blocks_cemented(self): + async def assert_all_blocks_cemented(self): for rpc in self.nano_rpc_all: - block_count = rpc.block_count() + block_count = await rpc.block_count() assert block_count["count"] == block_count[ "cemented"], "Not all blocks are cemented" diff --git a/nanolab/publisher/block_generator.py b/nanolab/publisher/block_generator.py index 5297f05..153cb7c 100644 --- a/nanolab/publisher/block_generator.py +++ b/nanolab/publisher/block_generator.py @@ -14,11 +14,11 @@ def get_random_account(self): pass @abstractmethod - def blockgen_single_change(self, - source_seed=None, - source_index=None, - source_private_key=None, - rep=None): + async def blockgen_single_change(self, + source_seed=None, + source_index=None, + source_private_key=None, + rep=None): pass @abstractmethod @@ -41,19 +41,19 @@ def get_random_account(self): random_account = self.nano_lib.nanolib_account_data(seed=random_seed) return random_account["account"] - def blockgen_single_change(self, - source_seed=None, - source_index=None, - source_private_key=None, - rep=None): + async def blockgen_single_change(self, + source_seed=None, + source_index=None, + source_private_key=None, + rep=None): rep = rep if rep else self.get_random_account() if source_private_key: - res = self.nano_rpc_default.create_change_block_pkey( + res = await self.nano_rpc_default.create_change_block_pkey( source_private_key, rep, broadcast=self.broadcast) return res if source_seed and source_index >= 0: - return self.nano_rpc_default.create_change_block( + return await self.nano_rpc_default.create_change_block( source_seed, source_index, rep, broadcast=self.broadcast) raise ValueError( @@ -63,11 +63,11 @@ def blockgen_single_change(self, def set_broadcast_blocks(self, broadcast): self.broadcast = broadcast - def create_send_and_open_block(self, send_amount_raw, source_seed, - source_index, destination_seed, - destination_index, representative): + async def create_send_and_open_block(self, send_amount_raw, source_seed, + source_index, destination_seed, + destination_index, representative): # destination = self.nano_rpc_default.generate_account(destination_seed, destination_index) - return self.blockgen_single_account_opener( + return await self.blockgen_single_account_opener( representative=representative, source_seed=source_seed, source_index=source_index, @@ -75,7 +75,7 @@ def create_send_and_open_block(self, send_amount_raw, source_seed, destination_index=destination_index, send_amount=send_amount_raw) - def blockgen_single_account_opener( + async def blockgen_single_account_opener( self, representative=None, source_key=None, # @@ -103,13 +103,13 @@ def blockgen_single_account_opener( seed=source_seed, index=source_index) - send_block = self.nano_rpc_default.create_send_block_pkey( + send_block = await self.nano_rpc_default.create_send_block_pkey( source["private"], destination["account"], send_amount, broadcast=self.broadcast) - open_block = self.nano_rpc_default.create_open_block( + open_block = await self.nano_rpc_default.create_open_block( destination["account"], destination["private"], send_amount, diff --git a/nanolab/publisher/test_case.py b/nanolab/publisher/test_case.py index deb95b4..946595e 100644 --- a/nanolab/publisher/test_case.py +++ b/nanolab/publisher/test_case.py @@ -56,9 +56,9 @@ def __init__(self, config: dict): async def _generate_and_confirm_block(self, index): self.bg_l.set_broadcast_blocks(True) representative = self.bg_l.get_random_account() - res = self.bg_l.blockgen_single_change(source_seed=self.source_seed, - source_index=index, - rep=representative) + res = await self.bg_l.blockgen_single_change(source_seed=self.source_seed, + source_index=index, + rep=representative) try: await self.ba_l.assert_single_block_confirmed_wait( res["hash"], self.timeout_s, 0.05) @@ -101,10 +101,10 @@ def __init__(self, config: dict): async def _generate_and_confirm_block(self, index): self.bg_l.set_broadcast_blocks(True) representative = self.bg_l.get_random_account() - res = self.bg_l.create_send_and_open_block(self.send_amount_raw, - self.source_seed, index, - self.destination_seed, - index, representative) + res = await self.bg_l.create_send_and_open_block(self.send_amount_raw, + self.source_seed, index, + self.destination_seed, + index, representative) try: await self.ba_l.assert_blocks_confirmed_wait( [block["hash"] for block in res], self.timeout_s, 0.05) diff --git a/setup.py b/setup.py index 9abd9cd..c890dab 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ long_description = fh.read() setup(name="nanolab", - version="0.0.22", + version="0.0.23", author="gr0vity", description="testing tool using nanomock", long_description=long_description, diff --git a/unit_tests/test_configs/threaded_mixed_bash.json b/unit_tests/test_configs/threaded_mixed_bash.json index 30c7ac3..0f7d467 100644 --- a/unit_tests/test_configs/threaded_mixed_bash.json +++ b/unit_tests/test_configs/threaded_mixed_bash.json @@ -9,7 +9,7 @@ "type": "python", "class": "TestClass", "method": "long_running_method", - "variables" : {"duration" : 0.5} + "variables" : {"duration" : 1} }, { "type": "bash", diff --git a/unit_tests/test_rpc_logger.py b/unit_tests/test_rpc_logger.py index 92889a5..91bfdbc 100644 --- a/unit_tests/test_rpc_logger.py +++ b/unit_tests/test_rpc_logger.py @@ -35,23 +35,25 @@ def mock_fully_cemented(): yield mock_fully_cemented -def test_get_block_count(mock_version, mock_block_count): +@pytest.mark.asyncio +async def test_get_block_count(mock_version, mock_block_count): logger = RPCLogger(node_name="test_node", rpc_url="http://test_url", expected_blocks_count=99, timeout=10) - count, cemented = logger._get_block_count() + count, cemented = await logger._get_block_count() assert count == 100 assert cemented == 50 -def test_is_fully_synced(mock_version, mock_fully_cemented): +@pytest.mark.asyncio +async def test_is_fully_synced(mock_version, mock_fully_cemented): logger = RPCLogger(node_name="test_node", rpc_url="http://test_url", expected_blocks_count=99, - count_start=1, - cemented_start=1, timeout=10) + await logger.async_init(count_start=1, cemented_start=1) + is_synced = logger.is_fully_synced(cemented=100) assert is_synced is_synced = logger.is_fully_synced(cemented=40) @@ -63,9 +65,9 @@ async def test_fetch_logs(mock_version, mock_fully_cemented): logger = RPCLogger(node_name="test_node", rpc_url="http://test_url", expected_blocks_count=99, - count_start=1, - cemented_start=1, timeout=0.5) + await logger.async_init(count_start=1, cemented_start=1) + logs = [] async for log in logger.fetch_logs(): logs.append(log) @@ -79,6 +81,7 @@ async def test_fetch_logs_timeout(mock_version, mock_fully_cemented): rpc_url="http://test_url", expected_blocks_count=500000, timeout=1) + await logger.async_init(count_start=1, cemented_start=1) logs = [] async for log in logger.fetch_logs(): logs.append(log) @@ -87,13 +90,13 @@ async def test_fetch_logs_timeout(mock_version, mock_fully_cemented): assert logs[0].elapsed_time == 0 -def test_case_with_different_version(mock_fully_cemented): +@pytest.mark.asyncio +async def test_case_with_different_version(mock_fully_cemented): with patch('nanomock.modules.nl_rpc.NanoRpc.version', return_value=None) as mock_version: logger = RPCLogger(node_name="test_node", rpc_url="http://test_url", expected_blocks_count=99, - count_start=1, - cemented_start=1, timeout=0.5) + await logger.async_init(count_start=1, cemented_start=1) assert logger.node_version == "???" diff --git a/unit_tests/test_run.py b/unit_tests/test_run.py index b4f0072..88c45f1 100644 --- a/unit_tests/test_run.py +++ b/unit_tests/test_run.py @@ -116,8 +116,8 @@ def test_threaded_with_group(self): start_time = time.time() run.main() duration = time.time() - start_time - self.assertTrue(duration >= 0.93, f"too short: {duration:.2f} s") - self.assertTrue(duration <= 1.07, f"too long: {duration:.2f} s") + self.assertTrue(duration >= 0.90, f"too short: {duration:.2f} s") + self.assertTrue(duration <= 1.10, f"too long: {duration:.2f} s") @patch('sys.argv', [ 'nanolab', 'run', '--testcase', @@ -263,8 +263,8 @@ def test_threaded_mixed_bash(self): start_time = time.time() run.main() duration = time.time() - start_time - self.assertTrue(duration >= 0.93, f"too short: {duration:.2f} s") - self.assertTrue(duration <= 1.07, f"too long: {duration:.2f} s") + self.assertTrue(duration >= 0.9, f"too short: {duration:.2f} s") + self.assertTrue(duration <= 1.1, f"too long: {duration:.2f} s") @patch('sys.argv', [ 'nanolab', 'run', '--testcase',