Skip to content

Commit

Permalink
Move to async rpc queries
Browse files Browse the repository at this point in the history
  • Loading branch information
gr0vity-dev committed Apr 17, 2024
1 parent 03e11aa commit 4d072a2
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 70 deletions.
12 changes: 7 additions & 5 deletions nanolab/loggers/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def set_timeout(self, timeout):
return self

@abstractmethod
def build(self):
async def build(self):
pass


Expand All @@ -36,11 +36,13 @@ def set_node_name(self, node_name):
self.node_name = node_name
return self

def build(self):
if self.rpc_url is None or self.expected_blocks_count is None or self.node_name is None:
async def build(self):
if any(param is None for param in [self.rpc_url, self.expected_blocks_count, self.node_name, self.timeout]):
raise Exception("Missing required parameters")
return RPCLogger(self.node_name, self.rpc_url,
self.expected_blocks_count, self.timeout)
logger = RPCLogger(self.node_name, self.rpc_url,
self.expected_blocks_count, self.timeout)
await logger.async_init()
return logger


# class WebSocketLoggerBuilder(LoggerBuilder):
Expand Down
4 changes: 2 additions & 2 deletions nanolab/loggers/factories/logger_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class LoggerFactory:

@staticmethod
def create_logger(logger_type: str, config: Dict[str, Any]):
async def create_logger(logger_type: str, config: Dict[str, Any]):
if logger_type == "rpc":
builder = RPCLoggerBuilder()
elif logger_type == "websocket":
Expand All @@ -18,4 +18,4 @@ def create_logger(logger_type: str, config: Dict[str, Any]):
if hasattr(builder, f'set_{key}'):
getattr(builder, f'set_{key}')(value)

return builder.build()
return await builder.build()
6 changes: 3 additions & 3 deletions nanolab/loggers/logger_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def __init__(self, logger_params, sink_params: List[Dict]):
self.sink_params_list = sink_params

async def create_logger_and_storages(self, current_logger_params):
logger = LoggerFactory.create_logger(self.logger_type,
current_logger_params)
logger = await LoggerFactory.create_logger(self.logger_type,
current_logger_params)

storages = []
for sink_params in self.sink_params_list:
Expand All @@ -33,7 +33,7 @@ async def create_logger_and_storages(self, current_logger_params):
async def create_loggers(self):
for node in self.nodes_config:
if (self.included_peers and node["name"] not in self.included_peers
) or node["name"] in self.excluded_peers:
) or node["name"] in self.excluded_peers:
continue
current_params = dict(self.logger_params)
current_params["node_name"] = node["name"]
Expand Down
33 changes: 17 additions & 16 deletions nanolab/loggers/sources/rpc_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,40 @@
from datetime import datetime
import asyncio
from typing import AsyncIterator
import logging


class RPCLogger(ILogger):
"""A logger that fetches logs from an RPC endpoint."""

FETCH_INTERVAL_DELAY = 0.1 # seconds

def __init__(self,
node_name: str,
rpc_url: str,
expected_blocks_count: int,
timeout: int,
count_start=None,
cemented_start=None):
def __init__(self, node_name: str, rpc_url: str, expected_blocks_count: int, timeout: int):
self.node_name = node_name
self.rpc_url = rpc_url
self.expected_blocks_count = int(expected_blocks_count)
self.timeout = int(timeout)
self.nanorpc = NanoRpc(self.rpc_url)
# These will be initialized in async_init
self.count_start = None
self.cemented_start = None
self.node_version = None
self.end_block_count = None
self.previous_count = 0
self.previous_cemented = 0
self.previous_elapsed_time = 0

async def async_init(self, count_start=None, cemented_start=None):
if count_start is None or cemented_start is None:
self.count_start, self.cemented_start = self._get_block_count()
self.count_start, self.cemented_start = await self._get_block_count()
else:
self.count_start = count_start
self.cemented_start = cemented_start
node_version = self.nanorpc.version()
node_version = await self.nanorpc.version()
self.node_version = f'{node_version["node_vendor"]} {node_version["build_info"][0:7]}' if node_version else "???"
self.end_block_count = self.count_start + self.expected_blocks_count
self.previous_count = 0
self.previous_cemented = 0
self.previous_elapsed_time = 0

def _get_block_count(self):
block_count = self.nanorpc.block_count()
async def _get_block_count(self):
block_count = await self.nanorpc.block_count()
return int(block_count["count"]), int(block_count["cemented"])

def is_fully_synced(self, cemented):
Expand All @@ -49,7 +50,7 @@ async def fetch_logs(self) -> AsyncIterator[LogData]:
start_time = time.time()
timeout_start = time.time()
while True:
count, cemented = self._get_block_count()
count, cemented = await self._get_block_count()
is_synced = self.is_fully_synced(cemented)
percent_cemented = ((cemented - self.cemented_start) /
self.expected_blocks_count) * 100
Expand Down
4 changes: 2 additions & 2 deletions nanolab/publisher/block_asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ async def assert_blocks_published(self, blocks):
assert all(info['exists']
for info in block_infos), f"Some blocks were not published"

def assert_all_blocks_cemented(self):
async def assert_all_blocks_cemented(self):
for rpc in self.nano_rpc_all:
block_count = rpc.block_count()
block_count = await rpc.block_count()
assert block_count["count"] == block_count[
"cemented"], "Not all blocks are cemented"

Expand Down
38 changes: 19 additions & 19 deletions nanolab/publisher/block_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ def get_random_account(self):
pass

@abstractmethod
def blockgen_single_change(self,
source_seed=None,
source_index=None,
source_private_key=None,
rep=None):
async def blockgen_single_change(self,
source_seed=None,
source_index=None,
source_private_key=None,
rep=None):
pass

@abstractmethod
Expand All @@ -41,19 +41,19 @@ def get_random_account(self):
random_account = self.nano_lib.nanolib_account_data(seed=random_seed)
return random_account["account"]

def blockgen_single_change(self,
source_seed=None,
source_index=None,
source_private_key=None,
rep=None):
async def blockgen_single_change(self,
source_seed=None,
source_index=None,
source_private_key=None,
rep=None):
rep = rep if rep else self.get_random_account()
if source_private_key:
res = self.nano_rpc_default.create_change_block_pkey(
res = await self.nano_rpc_default.create_change_block_pkey(
source_private_key, rep, broadcast=self.broadcast)
return res

if source_seed and source_index >= 0:
return self.nano_rpc_default.create_change_block(
return await self.nano_rpc_default.create_change_block(
source_seed, source_index, rep, broadcast=self.broadcast)

raise ValueError(
Expand All @@ -63,19 +63,19 @@ def blockgen_single_change(self,
def set_broadcast_blocks(self, broadcast):
self.broadcast = broadcast

def create_send_and_open_block(self, send_amount_raw, source_seed,
source_index, destination_seed,
destination_index, representative):
async def create_send_and_open_block(self, send_amount_raw, source_seed,
source_index, destination_seed,
destination_index, representative):
# destination = self.nano_rpc_default.generate_account(destination_seed, destination_index)
return self.blockgen_single_account_opener(
return await self.blockgen_single_account_opener(
representative=representative,
source_seed=source_seed,
source_index=source_index,
destination_seed=destination_seed,
destination_index=destination_index,
send_amount=send_amount_raw)

def blockgen_single_account_opener(
async def blockgen_single_account_opener(
self,
representative=None,
source_key=None, #
Expand Down Expand Up @@ -103,13 +103,13 @@ def blockgen_single_account_opener(
seed=source_seed,
index=source_index)

send_block = self.nano_rpc_default.create_send_block_pkey(
send_block = await self.nano_rpc_default.create_send_block_pkey(
source["private"],
destination["account"],
send_amount,
broadcast=self.broadcast)

open_block = self.nano_rpc_default.create_open_block(
open_block = await self.nano_rpc_default.create_open_block(
destination["account"],
destination["private"],
send_amount,
Expand Down
14 changes: 7 additions & 7 deletions nanolab/publisher/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def __init__(self, config: dict):
async def _generate_and_confirm_block(self, index):
self.bg_l.set_broadcast_blocks(True)
representative = self.bg_l.get_random_account()
res = self.bg_l.blockgen_single_change(source_seed=self.source_seed,
source_index=index,
rep=representative)
res = await self.bg_l.blockgen_single_change(source_seed=self.source_seed,
source_index=index,
rep=representative)
try:
await self.ba_l.assert_single_block_confirmed_wait(
res["hash"], self.timeout_s, 0.05)
Expand Down Expand Up @@ -101,10 +101,10 @@ def __init__(self, config: dict):
async def _generate_and_confirm_block(self, index):
self.bg_l.set_broadcast_blocks(True)
representative = self.bg_l.get_random_account()
res = self.bg_l.create_send_and_open_block(self.send_amount_raw,
self.source_seed, index,
self.destination_seed,
index, representative)
res = await self.bg_l.create_send_and_open_block(self.send_amount_raw,
self.source_seed, index,
self.destination_seed,
index, representative)
try:
await self.ba_l.assert_blocks_confirmed_wait(
[block["hash"] for block in res], self.timeout_s, 0.05)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
long_description = fh.read()

setup(name="nanolab",
version="0.0.22",
version="0.0.23",
author="gr0vity",
description="testing tool using nanomock",
long_description=long_description,
Expand Down
2 changes: 1 addition & 1 deletion unit_tests/test_configs/threaded_mixed_bash.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"type": "python",
"class": "TestClass",
"method": "long_running_method",
"variables" : {"duration" : 0.5}
"variables" : {"duration" : 1}
},
{
"type": "bash",
Expand Down
23 changes: 13 additions & 10 deletions unit_tests/test_rpc_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,25 @@ def mock_fully_cemented():
yield mock_fully_cemented


def test_get_block_count(mock_version, mock_block_count):
@pytest.mark.asyncio
async def test_get_block_count(mock_version, mock_block_count):
logger = RPCLogger(node_name="test_node",
rpc_url="http://test_url",
expected_blocks_count=99,
timeout=10)
count, cemented = logger._get_block_count()
count, cemented = await logger._get_block_count()
assert count == 100
assert cemented == 50


def test_is_fully_synced(mock_version, mock_fully_cemented):
@pytest.mark.asyncio
async def test_is_fully_synced(mock_version, mock_fully_cemented):
logger = RPCLogger(node_name="test_node",
rpc_url="http://test_url",
expected_blocks_count=99,
count_start=1,
cemented_start=1,
timeout=10)
await logger.async_init(count_start=1, cemented_start=1)

is_synced = logger.is_fully_synced(cemented=100)
assert is_synced
is_synced = logger.is_fully_synced(cemented=40)
Expand All @@ -63,9 +65,9 @@ async def test_fetch_logs(mock_version, mock_fully_cemented):
logger = RPCLogger(node_name="test_node",
rpc_url="http://test_url",
expected_blocks_count=99,
count_start=1,
cemented_start=1,
timeout=0.5)
await logger.async_init(count_start=1, cemented_start=1)

logs = []
async for log in logger.fetch_logs():
logs.append(log)
Expand All @@ -79,6 +81,7 @@ async def test_fetch_logs_timeout(mock_version, mock_fully_cemented):
rpc_url="http://test_url",
expected_blocks_count=500000,
timeout=1)
await logger.async_init(count_start=1, cemented_start=1)
logs = []
async for log in logger.fetch_logs():
logs.append(log)
Expand All @@ -87,13 +90,13 @@ async def test_fetch_logs_timeout(mock_version, mock_fully_cemented):
assert logs[0].elapsed_time == 0


def test_case_with_different_version(mock_fully_cemented):
@pytest.mark.asyncio
async def test_case_with_different_version(mock_fully_cemented):
with patch('nanomock.modules.nl_rpc.NanoRpc.version',
return_value=None) as mock_version:
logger = RPCLogger(node_name="test_node",
rpc_url="http://test_url",
expected_blocks_count=99,
count_start=1,
cemented_start=1,
timeout=0.5)
await logger.async_init(count_start=1, cemented_start=1)
assert logger.node_version == "???"
8 changes: 4 additions & 4 deletions unit_tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ def test_threaded_with_group(self):
start_time = time.time()
run.main()
duration = time.time() - start_time
self.assertTrue(duration >= 0.93, f"too short: {duration:.2f} s")
self.assertTrue(duration <= 1.07, f"too long: {duration:.2f} s")
self.assertTrue(duration >= 0.90, f"too short: {duration:.2f} s")
self.assertTrue(duration <= 1.10, f"too long: {duration:.2f} s")

@patch('sys.argv', [
'nanolab', 'run', '--testcase',
Expand Down Expand Up @@ -263,8 +263,8 @@ def test_threaded_mixed_bash(self):
start_time = time.time()
run.main()
duration = time.time() - start_time
self.assertTrue(duration >= 0.93, f"too short: {duration:.2f} s")
self.assertTrue(duration <= 1.07, f"too long: {duration:.2f} s")
self.assertTrue(duration >= 0.9, f"too short: {duration:.2f} s")
self.assertTrue(duration <= 1.1, f"too long: {duration:.2f} s")

@patch('sys.argv', [
'nanolab', 'run', '--testcase',
Expand Down

0 comments on commit 4d072a2

Please sign in to comment.