Skip to content

Commit

Permalink
connect to peers in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
gr0vity committed Nov 20, 2024
1 parent b3a58ba commit f664563
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 21 deletions.
31 changes: 22 additions & 9 deletions nanolab/node_interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from nanolab.xnomin.handshake import node_handshake_id
from nanomock.modules.nl_parse_config import ConfigReadWrite
from nanolab.src.utils import get_config_parser
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List
import asyncio
import random
import time
import itertools
import threading


from nanolab.loggers.logger_manager import LoggingManager


Expand Down Expand Up @@ -139,20 +141,31 @@ def get_xnolib_context(self, peers=None):

def __set_sockets_handshake(self):
ctx = self.get_xnolib_context(peers=self.peers)

msgtype = message_type_enum.publish
hdr = message_header(ctx['net_id'], [21, 21, 20],
message_type(msgtype), 0)
message_type(msgtype), 0)
hdr.set_block_type(block_type_enum.state)
all_peers = get_peers_from_service(ctx)
sockets = []
# Handshake with all peers
for peer in all_peers:

def handshake_peer_wrapper(peer):
s = self.handshake_peer(str(peer.ip), peer.port, ctx)
if s is not None:
sockets.append({
return {
"socket": s,
"peer": str(peer.ip) + ":" + str(peer.port)
})
"peer": f"{peer.ip}:{peer.port}"
}
return None

# Adjust max_workers according to your needs
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(handshake_peer_wrapper, peer): peer for peer in all_peers}
for future in as_completed(futures):
result = future.result()
if result is not None:
sockets.append(result)

return sockets, hdr

def flatten_messages(self,
Expand Down Expand Up @@ -240,11 +253,11 @@ def create_publish_tasks(self, sockets: List[Dict[str, Any]],

def consume_and_discard(self, socket_info):
sock = socket_info['socket']
peer = socket_info['peer']
# peer = socket_info['peer']
while True:
if self.read_socket(sock, 1024) is None:
time.sleep(0.1)
print(f"No data from {peer}. Wait 100ms")
time.sleep(0.25)
# print(f"No data from {peer}. Wait 100ms")

def read_socket(self, sock, byte_count: int) -> bytes or None:
try:
Expand Down
3 changes: 1 addition & 2 deletions nanolab/publisher/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
from nanolab.publisher.block_generator import BlockGenerator
from nanolab.publisher.confirmation_stats import ConfirmationStatsManager
from nanolab.src.utils import get_config_parser, print_dot
import time
from nanolab.publisher.event_bus import EventBus
from nanolab.publisher.block_event import BlockConfirmationEvent



class ITestCase(ABC):
Expand Down
21 changes: 12 additions & 9 deletions nanolab/xnomin/handshake.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
from nanolab.xnomin.peers import message_header, py_ed25519_blake2b, socket, hexlify, message_type
from secrets import token_bytes
from typing import Tuple
import time


def read_socket(sock: socket.socket, byte_count: int) -> bytes or None:
def read_socket(sock: socket.socket, byte_count: int, timeout: float = 30.0) -> bytes or None:
try:
sock.settimeout(timeout) # Set timeout for socket operations
data = bytearray()
start_time = time.time()
while len(data) < byte_count:
data += sock.recv(1)
if len(data) == 0:
raise ValueError('SocketClosedByPeer read_socket: data=%s' %
data)

if time.time() - start_time > timeout:
raise socket.timeout("Timeout while reading socket")
chunk = sock.recv(min(byte_count - len(data), 1024))
if not chunk:
raise ValueError('SocketClosedByPeer read_socket: data=%s' % data)
data += chunk
return bytes(data)

except OSError as msg:
print('read_socket] Error reading %d bytes, data=%s, msg=%s' %
except (OSError, socket.timeout) as msg:
print('[read_socket] Error reading %d bytes, data=%s, msg=%s' %
(byte_count, hexlify(data), msg))
return None

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
long_description = fh.read()

setup(name="nanolab",
version="0.0.26",
version="0.0.27",
author="gr0vity",
description="testing tool using nanomock",
long_description=long_description,
Expand Down

0 comments on commit f664563

Please sign in to comment.