Skip to content

Commit

Permalink
Extract vote_rebroadcaster (#4830)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev authored Jan 21, 2025
1 parent 19e1cc6 commit 99f95a1
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 19 deletions.
4 changes: 2 additions & 2 deletions nano/core_test/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ TEST (vote_processor, local_broadcast_without_a_representative)
ASSERT_NE (votes.end (), existing);
ASSERT_EQ (vote->timestamp (), existing->second.timestamp);
// Ensure the vote was broadcast
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out));
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out));
}

// Issue that tracks last changes on this test: https://github.com/nanocurrency/nano-node/issues/3485
Expand Down
5 changes: 5 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum class type
vote_processor,
vote_processor_tier,
vote_processor_overfill,
vote_rebroadcaster,
election,
election_cleanup,
election_vote,
Expand Down Expand Up @@ -172,6 +173,7 @@ enum class detail
queued,
error,
failed,
refresh,

// processing queue
queue,
Expand Down Expand Up @@ -655,6 +657,9 @@ enum class detail
pruned_count,
collect_targets,

// vote_rebroadcaster
rebroadcast_hashes,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::vote_cache_processing:
thread_role_name_string = "Vote cache proc";
break;
case nano::thread_role::name::vote_rebroadcasting:
thread_role_name_string = "Vote rebroad";
break;
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class name
message_processing,
vote_processing,
vote_cache_processing,
vote_rebroadcasting,
block_processing,
ledger_notifications,
request_loop,
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ add_library(
vote_generator.cpp
vote_processor.hpp
vote_processor.cpp
vote_rebroadcaster.hpp
vote_rebroadcaster.cpp
vote_router.hpp
vote_router.cpp
vote_spacing.hpp
Expand Down
4 changes: 2 additions & 2 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ class telemetry;
class unchecked_map;
class stats;
class vote_cache;
enum class vote_code;
enum class vote_source;
class vote_generator;
class vote_processor;
class vote_rebroadcaster;
class vote_router;
class vote_spacing;
class wallets;
Expand All @@ -52,6 +51,7 @@ enum class block_source;
enum class election_behavior;
enum class election_state;
enum class vote_code;
enum class vote_source;
}

namespace nano::scheduler
Expand Down
21 changes: 6 additions & 15 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/vote_generator.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/node/vote_rebroadcaster.hpp>
#include <nano/node/vote_router.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/ledger.hpp>
Expand Down Expand Up @@ -200,6 +201,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
http_callbacks{ *http_callbacks_impl },
pruning_impl{ std::make_unique<nano::pruning> (config, flags, ledger, stats, logger) },
pruning{ *pruning_impl },
vote_rebroadcaster_impl{ std::make_unique<nano::vote_rebroadcaster> (vote_router, network, wallets, stats, logger) },
vote_rebroadcaster{ *vote_rebroadcaster_impl },
startup_time{ std::chrono::steady_clock::now () },
node_seq{ seq }
{
Expand All @@ -219,21 +222,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
}
});

// Republish vote if it is new and the node does not host a principal representative (or close to)
vote_router.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
bool processed = std::any_of (results.begin (), results.end (), [] (auto const & result) {
return result.second == nano::vote_code::vote;
});
if (processed)
{
auto const reps = wallets.reps ();
if (!reps.have_half_rep () && !reps.exists (vote->account))
{
network.flood_vote (vote, 0.5f, /* rebroadcasted */ true);
}
}
});

// Do some cleanup due to this block never being processed by confirmation height processor
confirming_set.cementing_failed.add ([this] (auto const & hash) {
active.recently_confirmed.erase (hash);
Expand Down Expand Up @@ -571,6 +559,7 @@ void nano::node::start ()
monitor.start ();
http_callbacks.start ();
pruning.start ();
vote_rebroadcaster.start ();

add_initial_peers ();
}
Expand Down Expand Up @@ -621,6 +610,7 @@ void nano::node::stop ()
monitor.stop ();
http_callbacks.stop ();
pruning.stop ();
vote_rebroadcaster.stop ();

bootstrap_workers.stop ();
wallet_workers.stop ();
Expand Down Expand Up @@ -989,6 +979,7 @@ nano::container_info nano::node::container_info () const
info.add ("bounded_backlog", backlog.container_info ());
info.add ("http_callbacks", http_callbacks.container_info ());
info.add ("pruning", pruning.container_info ());
info.add ("vote_rebroadcaster", vote_rebroadcaster.container_info ());
return info;
}

Expand Down
2 changes: 2 additions & 0 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ class node final : public std::enable_shared_from_this<node>
nano::http_callbacks & http_callbacks;
std::unique_ptr<nano::pruning> pruning_impl;
nano::pruning & pruning;
std::unique_ptr<nano::vote_rebroadcaster> vote_rebroadcaster_impl;
nano::vote_rebroadcaster & vote_rebroadcaster;

public:
std::chrono::steady_clock::time_point const startup_time;
Expand Down
130 changes: 130 additions & 0 deletions nano/node/vote_rebroadcaster.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include <nano/lib/assert.hpp>
#include <nano/lib/interval.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/node/network.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/node/vote_rebroadcaster.hpp>
#include <nano/node/vote_router.hpp>
#include <nano/node/wallet.hpp>
#include <nano/secure/vote.hpp>

nano::vote_rebroadcaster::vote_rebroadcaster (nano::vote_router & vote_router_a, nano::network & network_a, nano::wallets & wallets_a, nano::stats & stats_a, nano::logger & logger_a) :
vote_router{ vote_router_a },
network{ network_a },
wallets{ wallets_a },
stats{ stats_a },
logger{ logger_a }
{
vote_router.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
bool processed = std::any_of (results.begin (), results.end (), [] (auto const & result) {
return result.second == nano::vote_code::vote;
});
if (processed && enable)
{
put (vote);
}
});
}

nano::vote_rebroadcaster::~vote_rebroadcaster ()
{
debug_assert (!thread.joinable ());
}

void nano::vote_rebroadcaster::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::vote_rebroadcasting);
run ();
});
}

void nano::vote_rebroadcaster::stop ()
{
{
std::lock_guard guard{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}

bool nano::vote_rebroadcaster::put (std::shared_ptr<nano::vote> const & vote)
{
bool added{ false };
{
std::lock_guard guard{ mutex };
if (queue.size () < max_queue)
{
if (!reps.exists (vote->account))
{
queue.push_back (vote);
added = true;
}
}
}
if (added)
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::queued);
condition.notify_one ();
}
else
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::overfill);
}
return added;
}

void nano::vote_rebroadcaster::run ()
{
std::unique_lock lock{ mutex };
while (!stopped)
{
condition.wait (lock, [&] {
return stopped || !queue.empty ();
});

if (stopped)
{
return;
}

stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::loop);

if (refresh_interval.elapse (15s))
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::refresh);

reps = wallets.reps ();
enable = !reps.have_half_rep (); // Disable vote rebroadcasting if the node has a principal representative (or close to)
}

if (!queue.empty ())
{
auto vote = queue.front ();
queue.pop_front ();

lock.unlock ();

stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ());
network.flood_vote (vote, 0.5f, /* rebroadcasted */ true); // TODO: Track number of peers that we sent the vote to

lock.lock ();
}
}
}

nano::container_info nano::vote_rebroadcaster::container_info () const
{
std::lock_guard guard{ mutex };

nano::container_info info;
info.put ("queue", queue.size ());
return info;
}
49 changes: 49 additions & 0 deletions nano/node/vote_rebroadcaster.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <nano/node/fwd.hpp>
#include <nano/node/wallet.hpp>

#include <atomic>
#include <condition_variable>
#include <deque>
#include <thread>

namespace nano
{
class vote_rebroadcaster final
{
public:
static size_t constexpr max_queue = 1024 * 16;

public:
vote_rebroadcaster (nano::vote_router &, nano::network &, nano::wallets &, nano::stats &, nano::logger &);
~vote_rebroadcaster ();

void start ();
void stop ();

bool put (std::shared_ptr<nano::vote> const &);

nano::container_info container_info () const;

public: // Dependencies
nano::vote_router & vote_router;
nano::network & network;
nano::wallets & wallets;
nano::stats & stats;
nano::logger & logger;

private:
void run ();

std::atomic<bool> enable{ true }; // Enable vote rebroadcasting only if the node does not host a representative
std::deque<std::shared_ptr<nano::vote>> queue;
nano::wallet_representatives reps;
nano::interval refresh_interval;

bool stopped{ false };
std::condition_variable condition;
mutable std::mutex mutex;
std::thread thread;
};
}

0 comments on commit 99f95a1

Please sign in to comment.