Skip to content

Commit

Permalink
Merge pull request #4786 from pwojcikdev/traffic-shaping-pr
Browse files Browse the repository at this point in the history
Traffic shaping
  • Loading branch information
pwojcikdev authored Dec 22, 2024
2 parents 878e7a1 + 4146cf6 commit db23b9a
Show file tree
Hide file tree
Showing 59 changed files with 1,148 additions and 698 deletions.
9 changes: 4 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ set(NANO_FUZZER_TEST
OFF
CACHE BOOL "")
set(NANO_ASIO_HANDLER_TRACKING
0
CACHE STRING "")
OFF
CACHE BOOL "")
set(NANO_ROCKSDB_TOOLS
OFF
CACHE BOOL "")
Expand Down Expand Up @@ -153,9 +153,8 @@ if(${NANO_TIMED_LOCKS} GREATER 0)
endif()
endif()

if(${NANO_ASIO_HANDLER_TRACKING} GREATER 0)
add_definitions(-DNANO_ASIO_HANDLER_TRACKING=${NANO_ASIO_HANDLER_TRACKING}
-DBOOST_ASIO_ENABLE_HANDLER_TRACKING)
if(NANO_ASIO_HANDLER_TRACKING)
add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING)
endif()

option(NANO_SIMD_OPTIMIZATIONS
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ TEST (active_elections, fork_replacement_tally)
node_config.peering_port = system.get_available_port ();
auto & node2 (*system.add_node (node_config));
node1.network.filter.clear ();
node2.network.flood_block (send_last);
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);

// Correct block without votes is ignored
Expand All @@ -984,7 +984,7 @@ TEST (active_elections, fork_replacement_tally)
// ensure vote arrives before the block
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
node1.network.filter.clear ();
node2.network.flood_block (send_last);
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);

// the send_last block should replace one of the existing block of the election because it has higher vote weight
Expand Down
86 changes: 43 additions & 43 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ TEST (network, last_contacted)

{
// check that the endpoints are part of the same connection
std::shared_ptr<nano::transport::tcp_socket> sock0 = channel0->socket.lock ();
std::shared_ptr<nano::transport::tcp_socket> sock1 = channel1->socket.lock ();
std::shared_ptr<nano::transport::tcp_socket> sock0 = channel0->socket;
std::shared_ptr<nano::transport::tcp_socket> sock1 = channel1->socket;
ASSERT_EQ (sock0->local_endpoint (), sock1->remote_endpoint ());
ASSERT_EQ (sock1->local_endpoint (), sock0->remote_endpoint ());
}
Expand Down Expand Up @@ -195,7 +195,7 @@ TEST (network, send_discarded_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block);
node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand All @@ -221,7 +221,7 @@ TEST (network, send_invalid_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block);
node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand Down Expand Up @@ -306,7 +306,7 @@ TEST (network, send_insufficient_work)
nano::publish publish1{ nano::dev::network_params.network, block1 };
auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ()));
ASSERT_NE (nullptr, tcp_channel);
tcp_channel->send (publish1, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish1, nano::transport::traffic_type::test);
ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
Expand All @@ -320,7 +320,7 @@ TEST (network, send_insufficient_work)
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
.build ();
nano::publish publish2{ nano::dev::network_params.network, block2 };
tcp_channel->send (publish2, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish2, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1);
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
// Legacy block work epoch_1
Expand All @@ -333,7 +333,7 @@ TEST (network, send_insufficient_work)
.work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2))
.build ();
nano::publish publish3{ nano::dev::network_params.network, block3 };
tcp_channel->send (publish3, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish3, nano::transport::traffic_type::test);
ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
Expand All @@ -349,7 +349,7 @@ TEST (network, send_insufficient_work)
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
.build ();
nano::publish publish4{ nano::dev::network_params.network, block4 };
tcp_channel->send (publish4, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish4, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
Expand Down Expand Up @@ -562,14 +562,14 @@ TEST (network, peer_max_tcp_attempts)
node_config.network.max_peers_per_ip = 3;
auto node = system.add_node (node_config, node_flags);

for (auto i (0); i < node_config.network.max_peers_per_ip; ++i)
for (auto i = 0; i < node_config.network.max_peers_per_ip; ++i)
{
auto node2 (std::make_shared<nano::node> (system.io_ctx, system.get_available_port (), nano::unique_path (), system.work, node_flags));
node2->start ();
system.nodes.push_back (node2);

// Start TCP attempt
node->network.merge_peer (node2->network.endpoint ());
// Disable reachout from temporary nodes to avoid mixing outbound and inbound connections
nano::node_config temp_config = system.default_config ();
temp_config.network.peer_reachout = {};
temp_config.network.cached_peer_reachout = {};
auto temp_node = system.make_disconnected_node (temp_config, node_flags);
ASSERT_TRUE (node->network.merge_peer (temp_node->network.endpoint ()));
}

ASSERT_TIMELY_EQ (15s, node->network.size (), node_config.network.max_peers_per_ip);
Expand Down Expand Up @@ -632,9 +632,9 @@ TEST (network, duplicate_detection)
ASSERT_NE (nullptr, tcp_channel);

ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message));
tcp_channel->send (publish);
tcp_channel->send (publish, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0);
tcp_channel->send (publish);
tcp_channel->send (publish, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1);
}

Expand Down Expand Up @@ -681,9 +681,9 @@ TEST (network, duplicate_vote_detection)
ASSERT_NE (nullptr, tcp_channel);

ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);
}

Expand Down Expand Up @@ -711,12 +711,12 @@ TEST (network, duplicate_revert_vote)
ASSERT_NE (nullptr, tcp_channel);

// First vote should be processed
tcp_channel->send (message1);
tcp_channel->send (message1, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ()));

// Second vote should get dropped from processor queue
tcp_channel->send (message2);
tcp_channel->send (message2, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
// And the filter should not have it
WAIT (500ms); // Give the node time to process the vote
Expand All @@ -741,9 +741,9 @@ TEST (network, expire_duplicate_filter)

// Send a vote
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);

// The filter should expire the vote after some time
Expand All @@ -752,7 +752,7 @@ TEST (network, expire_duplicate_filter)
}

// The test must be completed in less than 1 second
TEST (network, bandwidth_limiter_4_messages)
TEST (network, DISABLED_bandwidth_limiter_4_messages)
{
nano::test::system system;
nano::publish message{ nano::dev::network_params.network, nano::dev::genesis };
Expand All @@ -767,22 +767,22 @@ TEST (network, bandwidth_limiter_4_messages)
// Send droppable messages
for (auto i = 0; i < message_limit; i += 2) // number of channels
{
channel1.send (message);
channel2.send (message);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
}
// Only sent messages below limit, so we don't expect any drops
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));

// Send droppable message; drop stats should increase by one now
channel1.send (message);
channel1.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));

// Send non-droppable message, i.e. drop stats should not increase
channel2.send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
channel2.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
}

TEST (network, bandwidth_limiter_2_messages)
TEST (network, DISABLED_bandwidth_limiter_2_messages)
{
nano::test::system system;
nano::publish message{ nano::dev::network_params.network, nano::dev::genesis };
Expand All @@ -795,10 +795,10 @@ TEST (network, bandwidth_limiter_2_messages)
nano::transport::inproc::channel channel1{ node, node };
nano::transport::inproc::channel channel2{ node, node };
// change the bandwidth settings, 2 packets will be dropped
channel1.send (message);
channel2.send (message);
channel1.send (message);
channel2.send (message);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 2, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
}

Expand All @@ -815,10 +815,10 @@ TEST (network, bandwidth_limiter_with_burst)
nano::transport::inproc::channel channel1{ node, node };
nano::transport::inproc::channel channel2{ node, node };
// change the bandwidth settings, no packet will be dropped
channel1.send (message);
channel2.send (message);
channel1.send (message);
channel2.send (message);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
}

Expand Down Expand Up @@ -962,7 +962,7 @@ TEST (network, filter_invalid_network_bytes)
// send a keepalive, from node2 to node1, with the wrong network bytes
nano::keepalive keepalive{ nano::dev::network_params.network };
const_cast<nano::networks &> (keepalive.header.network) = nano::networks::invalid;
channel->send (keepalive);
channel->send (keepalive, nano::transport::traffic_type::test);

ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network));
}
Expand All @@ -981,7 +981,7 @@ TEST (network, filter_invalid_version_using)
// send a keepalive, from node2 to node1, with the wrong version_using
nano::keepalive keepalive{ nano::dev::network_params.network };
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
channel->send (keepalive);
channel->send (keepalive, nano::transport::traffic_type::test);

ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version));
}
Expand Down Expand Up @@ -1068,8 +1068,8 @@ TEST (network, purge_dead_channel)

auto & node1 = *system.add_node (flags);

node1.observers.socket_connected.add ([&] (nano::transport::tcp_socket & sock) {
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
node1.observers.socket_connected.add ([&] (auto const & socket) {
system.logger.debug (nano::log::type::test, "Connected socket: {}", nano::streamed (socket));
});

auto & node2 = *system.add_node (flags);
Expand Down Expand Up @@ -1119,8 +1119,8 @@ TEST (network, purge_dead_channel_remote)
auto & node1 = *system.add_node (flags);
auto & node2 = *system.add_node (flags);

node2.observers.socket_connected.add ([&] (nano::transport::tcp_socket & sock) {
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
node2.observers.socket_connected.add ([&] (auto const & socket) {
system.logger.debug (nano::log::type::test, "Connected socket: {}", nano::streamed (socket));
});

ASSERT_EQ (node1.network.size (), 1);
Expand Down
16 changes: 2 additions & 14 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ TEST (node, confirm_locked)
.sign (nano::keypair ().prv, 0)
.work (0)
.build ();
system.nodes[0]->network.flood_block (block);
system.nodes[0]->network.flood_block (block, nano::transport::traffic_type::test);
}

TEST (node_config, random_rep)
Expand Down Expand Up @@ -1005,14 +1005,9 @@ TEST (node, fork_no_vote_quorum)
ASSERT_FALSE (system.wallet (1)->store.fetch (transaction, key1, key3));
auto vote = std::make_shared<nano::vote> (key1, key3, 0, 0, std::vector<nano::block_hash>{ send2->hash () });
nano::confirm_ack confirm{ nano::dev::network_params.network, vote };
std::vector<uint8_t> buffer;
{
nano::vectorstream stream (buffer);
confirm.serialize (stream);
}
auto channel = node2.network.find_node_id (node3.node_id.pub);
ASSERT_NE (nullptr, channel);
channel->send_buffer (nano::shared_const_buffer (std::move (buffer)));
channel->send (confirm, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) >= 3);
ASSERT_EQ (node1.latest (nano::dev::genesis_key.pub), send1->hash ());
ASSERT_EQ (node2.latest (nano::dev::genesis_key.pub), send1->hash ());
Expand Down Expand Up @@ -2662,13 +2657,6 @@ TEST (node, dont_write_lock_node)

TEST (node, bidirectional_tcp)
{
#ifdef _WIN32
if (nano::rocksdb_config::using_rocksdb_in_tests ())
{
// Don't test this in rocksdb mode
GTEST_SKIP ();
}
#endif
nano::test::system system;
nano::node_flags node_flags;
// Disable bootstrap to start elections for new blocks
Expand Down
5 changes: 3 additions & 2 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TEST (peer_container, reserved_ip_is_not_a_peer)

// Test the TCP channel cleanup function works properly. It is used to remove peers that are not
// exchanging messages after a while.
TEST (peer_container, tcp_channel_cleanup_works)
TEST (peer_container, DISABLED_tcp_channel_cleanup_works)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
Expand Down Expand Up @@ -90,6 +90,7 @@ TEST (peer_container, tcp_channel_cleanup_works)

for (auto it = 0; node1.network.tcp_channels.size () > 1 && it < 10; ++it)
{
// FIXME: This is racy and doesn't work reliably
// we can't control everything the nodes are doing in background, so using the middle time as
// the cutoff point.
auto const channel1_last_packet_sent = channel1->get_last_packet_sent ();
Expand Down Expand Up @@ -254,7 +255,7 @@ TEST (peer_container, depeer_on_outdated_version)
nano::keepalive keepalive{ nano::dev::network_params.network };
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
ASSERT_TIMELY (5s, channel->alive ());
channel->send (keepalive);
channel->send (keepalive, nano::transport::traffic_type::test);

ASSERT_TIMELY (5s, !channel->alive ());
}
2 changes: 1 addition & 1 deletion nano/core_test/rep_crawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ TEST (rep_crawler, ignore_rebroadcasted)

auto tick = [&] () {
nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
channel2to1->send (msg, nullptr, nano::transport::buffer_drop_policy::no_socket_drop);
channel2to1->send (msg, nano::transport::traffic_type::test);
return false;
};

Expand Down
Loading

0 comments on commit db23b9a

Please sign in to comment.