Skip to content

Commit

Permalink
wip logging console based
Browse files Browse the repository at this point in the history
  • Loading branch information
gr0vity committed Apr 25, 2023
1 parent 576fdef commit b4fef38
Show file tree
Hide file tree
Showing 15 changed files with 384 additions and 107 deletions.
3 changes: 2 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"console": "integratedTerminal",
"cwd": "${workspaceFolder}",
"env": {
"SNIPPET_FILE": "unit_tests/test_snippets.json"
"SNIPPET_FILE": "app/snippets.json",
"CONFIG_FILE" : "app/configs/3node_network/easy_setup.json"
}
}
]
Expand Down
27 changes: 22 additions & 5 deletions app/configs/3node_network/easy_setup.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
{
"docker_tags": [
"nanocurrency/nano:V24.0"
"db35_rev_a302_fde8", "nanocurrency/nano-beta:V25.0DB35"
],
"commands": [
"commands": [
{
"skip": false,
"type": "snippet",
"key": "setup_easy",
"key": "setup_ledger_and_config",
"variables": {
"NODES_CONFIG": "app/3node_network/nanolocal_config.toml",
"REMOTE_IP": "127.0.0.1"
"NL_CONFIG": "app/configs/3node_network/nl_config_default.toml",
"REMOTE_IP": "127.0.0.1",
"SETUP_NODES": "nl_genesis nl_pr1 nl_pr2 nl_pr3",
"LEDGER": "app/data/ledgers/3pr_init.ldb"
}
},
{
"skip": false,
"type": "python",
"class": "NodeInteraction",
"method": "publish_blocks_test",
"variables": {
"params": {
"blocks_path": "app/data/blocks/3node_net.bintree.50k.json",
"bps": 5000
},
"logger_type": "rpc",
"logger_timeout": 180,
"logger_include_peers": ["nl_genesis","nl_pr1"]
}
}
]
Expand Down
44 changes: 0 additions & 44 deletions app/configs/3node_network/nanolocal_config.toml

This file was deleted.

109 changes: 94 additions & 15 deletions app/nano_node_interaction.py → app/node_interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,104 @@

os.environ["NL_APP_DIR"] = "submodules/nanolocal/nanolocal"

#from submodules.xnolib.pynanocoin import *
from submodules.xnomin.peers import get_connected_socket_endpoint, message_header, block_state, block_type_enum, message_type_enum, network_id, message_type, get_peers_from_service
from submodules.xnomin.handshake import node_handshake_id

from nanolocal.common.nl_parse_config import ConfigParser, ConfigReadWrite
from nanolocal.common.nl_rpc import NanoRpc
from app.node_tools import StatsLogger

from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
import asyncio
import random
import time


def xnolib_publish():
params = {
"blocks_path": "app/data/blocks/3node_net.bintree.50k.json",
"bps": 100
}
import itertools


def load_nodes_config():
"""Load nodes configuration from a file."""
config_parser = ConfigParser()
return config_parser.get_nodes_config()


async def create_loggers(hashes,
logger_type=None,
logger_timeout=None,
included_peers=None,
excluded_peers=None):
"""
Create loggers for nodes.
:param hashes: list of hashes to log
:param logger_type: type of logger to use
:param logger_timeout: logger timeout
:param included_peers: list of peers to include in logging
:param excluded_peers: list of peers to exclude from logging
:return: list of logger instances
"""
nodes_config = load_nodes_config()
loggers = []
for node in nodes_config:
if included_peers and node["name"] not in included_peers:
continue
if excluded_peers and node["name"] in excluded_peers:
continue

nanorpc = NanoRpc(node["rpc_url"])
node_version = nanorpc.version()
formatted_node_version = f'{node_version["node_vendor"]} {node_version["build_info"][0:7]}'
start_block_count = nanorpc.block_count()

logger = StatsLogger(logger_type,
node["name"],
formatted_node_version,
hashes,
start_block_count,
timeout=logger_timeout,
ws_url=node["ws_url"],
rpc_url=node["rpc_url"])
loggers.append(logger)
return loggers


async def start_loggers(loggers):
"""
Start loggers.
:param loggers: list of logger instances
"""
logger_tasks = [asyncio.create_task(logger.start()) for logger in loggers]
await asyncio.gather(*logger_tasks)


async def xnolib_publish(params: dict,
logger_type=None,
logger_timeout=None,
included_peers=None,
excluded_peers=None):
"""
Publish data using Xnolib and enable logging.
:param logger_type: type of logger to use
:param logger_timeout: logger timeout
:param included_peers: list of peers to include in logging
:param excluded_peers: list of peers to exclude from logging
"""
# params = {
# "blocks_path": "app/data/blocks/3node_net.bintree.50k.json",
# "bps": 1000
# }
block_lists = get_blocks_from_disk(params)
sp = SocketPublish(params)
msg_count = asyncio.run(sp.run(block_lists))
print(msg_count)
messages, hashes = sp.flatten_messages(block_lists)

loggers = await create_loggers(hashes, logger_type, logger_timeout,
included_peers, excluded_peers)
enable_logging_task = asyncio.create_task(start_loggers(loggers))
sp_task = asyncio.create_task(sp.run(messages))

# Await both tasks concurrently
await asyncio.gather(enable_logging_task, sp_task)


def read_blocks_from_disk(path, seeds=False, hashes=False, blocks=False):
Expand Down Expand Up @@ -133,7 +210,7 @@ def __set_sockets_handshake(self):
hdr.set_block_type(block_type_enum.state)
all_peers = get_peers_from_service(ctx)
sockets = []
# Handshake with all voting peers
# Handshake with all peers
for peer in all_peers:
s = self.handshake_peer(str(peer.ip), peer.port, ctx)
if s is not None:
Expand All @@ -152,7 +229,10 @@ def flatten_messages(self,
for block in block_list
]
messages.extend(publish_msg)
return messages

block_hashes = list(itertools.chain(*block_lists['h']))

return messages, block_hashes

async def publish_message(self, socket: Dict[str, Any],
messages: List[Any]) -> None:
Expand Down Expand Up @@ -218,8 +298,7 @@ def create_default_tasks(self, sockets, messages):
tasks = [self.publish_message(socket, messages) for socket in sockets]
return tasks

async def run(self, block_lists: List[List[Dict[str, Any]]]) -> int:
messages = self.flatten_messages(block_lists)
async def run(self, messages: List[Any]) -> int:
tasks = self.create_publish_tasks(self.sockets, messages)
await asyncio.gather(*tasks)
return len(messages)
Loading

0 comments on commit b4fef38

Please sign in to comment.