diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index b42e31cb7d..850ddab934 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -1,7 +1,6 @@ -#include "nano/lib/numbers.hpp" - #include #include +#include #include #include #include @@ -5644,3 +5643,91 @@ TEST (ledger_receivable, any_one) ASSERT_TRUE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), nano::dev::genesis_key.pub)); ASSERT_FALSE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), key.pub)); } + +TEST (ledger_transaction, write_refresh) +{ + auto ctx = nano::test::context::ledger_empty (); + nano::block_builder builder; + nano::keypair key; + auto send1 = builder + .state () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) + .link (nano::dev::genesis_key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*ctx.pool ().generate (nano::dev::genesis->hash ())) + .build (); + auto send2 = builder + .state () + .account (nano::dev::genesis_key.pub) + .previous (send1->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio) + .link (key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*ctx.pool ().generate (send1->hash ())) + .build (); + + auto transaction = ctx.ledger ().tx_begin_write (); + ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send1)); + // Force refresh + ASSERT_TRUE (transaction.refresh_if_needed (0ms)); + ASSERT_FALSE (transaction.refresh_if_needed ()); // Should not refresh again too soon + // Refreshed transaction should work just fine + ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send2)); +} + +TEST (ledger_transaction, write_wait_order) +{ + nano::test::system system; + + auto ctx = nano::test::context::ledger_empty (); + + std::atomic acquired1{ false }; + std::atomic acquired2{ false }; + std::atomic acquired3{ false }; + + std::latch latch1{ 1 }; + std::latch latch2{ 1 }; + std::latch latch3{ 1 }; + + auto fut1 = std::async (std::launch::async, [&] { + auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::generic); + acquired1 = true; + latch1.wait (); // Wait for the signal to drop tx + }); + WAIT (250ms); // Allow thread to start + + auto fut2 = std::async (std::launch::async, [&ctx, &acquired2, &latch2] { + auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::blockprocessor); + acquired2 = true; + latch2.wait (); // Wait for the signal to drop tx + }); + WAIT (250ms); // Allow thread to start + + auto fut3 = std::async (std::launch::async, [&ctx, &acquired3, &latch3] { + auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::confirmation_height); + acquired3 = true; + latch3.wait (); // Wait for the signal to drop tx + }); + WAIT (250ms); // Allow thread to start + + // First transaction should be ready immediately, others should be waiting + ASSERT_TIMELY (5s, acquired1.load ()); + ASSERT_NEVER (250ms, acquired2.load ()); + ASSERT_NEVER (250ms, acquired3.load ()); + + // Signal to continue and drop the first transaction + latch1.count_down (); + ASSERT_TIMELY (5s, acquired2.load ()); + ASSERT_NEVER (250ms, acquired3.load ()); + + // Signal to continue and drop the second transaction + latch2.count_down (); + ASSERT_TIMELY (5s, acquired3.load ()); + + // Signal to continue and drop the third transaction + latch3.count_down (); +} \ No newline at end of file diff --git a/nano/core_test/message.cpp b/nano/core_test/message.cpp index 1e9728b65f..e093aa6eec 100644 --- a/nano/core_test/message.cpp +++ b/nano/core_test/message.cpp @@ -26,6 +26,31 @@ std::shared_ptr random_block () } } +TEST (message, header_version) +{ + // Simplest message type + nano::keepalive original{ nano::dev::network_params.network }; + + // Serialize the original keepalive message + std::vector bytes; + { + nano::vectorstream stream (bytes); + original.serialize (stream); + } + + // Deserialize the byte stream back to a message header + nano::bufferstream stream (bytes.data (), bytes.size ()); + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + + // Check header versions + ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min); + ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using); + ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max); + ASSERT_EQ (nano::message_type::keepalive, header.type); +} + TEST (message, keepalive_serialization) { nano::keepalive request1{ nano::dev::network_params.network }; @@ -62,33 +87,60 @@ TEST (message, keepalive_deserialize) ASSERT_EQ (message1.peers, message2.peers); } -TEST (message, publish_serialization) +TEST (message, publish) { + // Create a random block auto block = random_block (); - nano::publish publish{ nano::dev::network_params.network, block }; - ASSERT_EQ (nano::block_type::send, publish.header.block_type ()); + nano::publish original{ nano::dev::network_params.network, block }; + ASSERT_FALSE (original.is_originator ()); + + // Serialize the original publish message std::vector bytes; { nano::vectorstream stream (bytes); - publish.header.serialize (stream); - } - ASSERT_EQ (8, bytes.size ()); - ASSERT_EQ (0x52, bytes[0]); - ASSERT_EQ (0x41, bytes[1]); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[2]); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[3]); - ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, bytes[4]); - ASSERT_EQ (static_cast (nano::message_type::publish), bytes[5]); - ASSERT_EQ (0x00, bytes[6]); // extensions - ASSERT_EQ (static_cast (nano::block_type::send), bytes[7]); + original.serialize (stream); + } + + // Deserialize the byte stream back to a publish message nano::bufferstream stream (bytes.data (), bytes.size ()); - auto error (false); + bool error = false; nano::message_header header (error, stream); ASSERT_FALSE (error); - ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max); - ASSERT_EQ (nano::message_type::publish, header.type); + nano::publish deserialized (error, stream, header); + ASSERT_FALSE (error); + + // Assert that the original and deserialized messages are equal + ASSERT_EQ (original, deserialized); + ASSERT_EQ (*original.block, *deserialized.block); + ASSERT_EQ (original.is_originator (), deserialized.is_originator ()); +} + +TEST (message, publish_originator_flag) +{ + // Create a random block + auto block = random_block (); + nano::publish original{ nano::dev::network_params.network, block, /* originator */ true }; + ASSERT_TRUE (original.is_originator ()); + + // Serialize the original publish message + std::vector bytes; + { + nano::vectorstream stream (bytes); + original.serialize (stream); + } + + // Deserialize the byte stream back to a publish message + nano::bufferstream stream (bytes.data (), bytes.size ()); + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + nano::publish deserialized (error, stream, header); + ASSERT_FALSE (error); + + // Assert that the originator flag is set correctly in both the original and deserialized messages + ASSERT_TRUE (deserialized.is_originator ()); + ASSERT_EQ (original, deserialized); + ASSERT_EQ (*original.block, *deserialized.block); } TEST (message, confirm_header_flags) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 7428a0eaee..c226337ea0 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -171,6 +171,7 @@ enum class detail // block source live, + live_originator, bootstrap, bootstrap_legacy, unchecked, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 80ecefeb62..123ab5aeae 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -54,6 +54,7 @@ nano::block_processor::block_processor (nano::node & node_a) : switch (origin.source) { case nano::block_source::live: + case nano::block_source::live_originator: return config.max_peer_queue; default: return config.max_system_queue; @@ -64,6 +65,7 @@ nano::block_processor::block_processor (nano::node & node_a) : switch (origin.source) { case nano::block_source::live: + case nano::block_source::live_originator: return config.priority_live; case nano::block_source::bootstrap: case nano::block_source::bootstrap_legacy: @@ -295,8 +297,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock { processed_batch_t processed; - auto scoped_write_guard = node.store.write_queue.wait (nano::store::writer::process_batch); - auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }); + auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor); nano::timer timer_l; lock_a.lock (); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 57de50362f..483329c5e4 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -27,6 +27,7 @@ enum class block_source { unknown = 0, live, + live_originator, bootstrap, bootstrap_legacy, unchecked, @@ -67,10 +68,10 @@ class block_processor final class context { public: - context (std::shared_ptr block, block_source source); + context (std::shared_ptr block, nano::block_source source); std::shared_ptr const block; - block_source const source; + nano::block_source const source; std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; public: @@ -85,18 +86,18 @@ class block_processor final }; public: - block_processor (nano::node &); + explicit block_processor (nano::node &); ~block_processor (); void start (); void stop (); std::size_t size () const; - std::size_t size (block_source) const; + std::size_t size (nano::block_source) const; bool full () const; bool half_full () const; - bool add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr const & channel = nullptr); - std::optional add_blocking (std::shared_ptr const & block, block_source); + bool add (std::shared_ptr const &, nano::block_source = nano::block_source::live, std::shared_ptr const & channel = nullptr); + std::optional add_blocking (std::shared_ptr const & block, nano::block_source); void force (std::shared_ptr const &); bool should_log (); @@ -128,7 +129,7 @@ class block_processor final nano::node & node; private: - nano::fair_queue queue; + nano::fair_queue queue; std::chrono::steady_clock::time_point next_log; diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index b577e3d5e4..ee72858b52 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -133,13 +133,13 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) lock.unlock (); { - // TODO: Properly limiting batch times requires this combo to be wrapped in a single object that provides refresh functionality - auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height); - auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height }); + auto transaction = ledger.tx_begin_write ({ nano::tables::confirmation_height }, nano::store::writer::confirmation_height); for (auto const & hash : batch) { - auto added = ledger.confirm (tx, hash); + transaction.refresh_if_needed (); + + auto added = ledger.confirm (transaction, hash); if (!added.empty ()) { // Confirming this block may implicitly confirm more diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp index 89902b43f4..a479b7bf7b 100644 --- a/nano/node/message_processor.cpp +++ b/nano/node/message_processor.cpp @@ -157,7 +157,6 @@ void nano::message_processor::run_batch (nano::unique_lock & lock) namespace { -// TODO: This was moved, so compare with latest develop before merging to avoid merge bugs class process_visitor : public nano::message_visitor { public: @@ -184,7 +183,9 @@ class process_visitor : public nano::message_visitor void publish (nano::publish const & message) override { - bool added = node.block_processor.add (message.block, nano::block_source::live, channel); + // Put blocks that are being initally broadcasted in a separate queue, so that they won't have to compete with rebroadcasted blocks + // Both queues have the same priority and size, so the potential for exploiting this is limited + bool added = node.block_processor.add (message.block, message.is_originator () ? nano::block_source::live_originator : nano::block_source::live, channel); if (!added) { node.network.publish_filter.clear (message.digest); diff --git a/nano/node/messages.cpp b/nano/node/messages.cpp index ac82d67b27..0143a62b2e 100644 --- a/nano/node/messages.cpp +++ b/nano/node/messages.cpp @@ -433,11 +433,12 @@ nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_h } } -nano::publish::publish (nano::network_constants const & constants, std::shared_ptr const & block_a) : +nano::publish::publish (nano::network_constants const & constants, std::shared_ptr const & block_a, bool is_originator_a) : message (constants, nano::message_type::publish), block (block_a) { header.block_type_set (block->type ()); + header.flag_set (originator_flag, is_originator_a); } void nano::publish::serialize (nano::stream & stream_a) const @@ -465,11 +466,17 @@ bool nano::publish::operator== (nano::publish const & other_a) const return *block == *other_a.block; } +bool nano::publish::is_originator () const +{ + return header.flag_test (originator_flag); +} + void nano::publish::operator() (nano::object_stream & obs) const { nano::message::operator() (obs); // Write common data obs.write ("block", block); + obs.write ("originator", is_originator ()); } /* @@ -682,6 +689,7 @@ void nano::confirm_ack::operator() (nano::object_stream & obs) const nano::message::operator() (obs); // Write common data obs.write ("vote", vote); + obs.write ("rebroadcasted", is_rebroadcasted ()); } /* diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 5046eae812..7cba524971 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -183,16 +183,23 @@ class keepalive final : public message * * Header extensions: * - [0x0f00] Block type: Identifies the specific type of the block. + * - [0x0004] Originator flag */ class publish final : public message { public: publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & = 0, nano::block_uniquer * = nullptr); - publish (nano::network_constants const & constants, std::shared_ptr const &); - void visit (nano::message_visitor &) const override; + publish (nano::network_constants const & constants, std::shared_ptr const &, bool is_originator = false); + void serialize (nano::stream &) const override; bool deserialize (nano::stream &, nano::block_uniquer * = nullptr); + void visit (nano::message_visitor &) const override; bool operator== (nano::publish const &) const; + + static uint8_t constexpr originator_flag = 2; // 0x0004 + bool is_originator () const; + +public: // Payload std::shared_ptr block; nano::uint128_t digest{ 0 }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index f741aa7f2f..1f4051017d 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -247,22 +247,22 @@ void nano::network::flood_keepalive_self (float const scale_a) flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a); } -void nano::network::flood_block (std::shared_ptr const & block_a, nano::transport::buffer_drop_policy const drop_policy_a) +void nano::network::flood_block (std::shared_ptr const & block, nano::transport::buffer_drop_policy const drop_policy) { - nano::publish message (node.network_params.network, block_a); - flood_message (message, drop_policy_a); + nano::publish message{ node.network_params.network, block }; + flood_message (message, drop_policy); } -void nano::network::flood_block_initial (std::shared_ptr const & block_a) +void nano::network::flood_block_initial (std::shared_ptr const & block) { - nano::publish message (node.network_params.network, block_a); - for (auto const & i : node.rep_crawler.principal_representatives ()) + nano::publish message{ node.network_params.network, block, /* is_originator */ true }; + for (auto const & rep : node.rep_crawler.principal_representatives ()) { - i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + rep.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); } - for (auto & i : list_non_pr (fanout (1.0))) + for (auto & peer : list_non_pr (fanout (1.0))) { - i->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + peer->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 33f07fe417..a2521e49cf 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -617,7 +617,7 @@ void nano::node::process_active (std::shared_ptr const & incoming) nano::block_status nano::node::process (std::shared_ptr block) { - auto const transaction = ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }); + auto const transaction = ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::node); return process (transaction, block); } @@ -1033,8 +1033,7 @@ void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_wei transaction_write_count = 0; if (!pruning_targets.empty () && !stopped) { - auto scoped_write_guard = store.write_queue.wait (nano::store::writer::pruning); - auto write_transaction = ledger.tx_begin_write ({ tables::blocks, tables::pruned }); + auto write_transaction = ledger.tx_begin_write ({ tables::blocks, tables::pruned }, nano::store::writer::pruning); while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped) { auto const & pruning_hash (pruning_targets.front ()); diff --git a/nano/node/vote_generator.cpp b/nano/node/vote_generator.cpp index a0619d7ceb..4809e873ee 100644 --- a/nano/node/vote_generator.cpp +++ b/nano/node/vote_generator.cpp @@ -28,7 +28,7 @@ nano::vote_generator::vote_generator (nano::node_config const & config_a, nano:: stats (stats_a), logger (logger_a), is_final (is_final_a), - vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 1024 * 4 }, + vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 256 }, inproc_channel{ std::make_shared (node, node) } { vote_generation_queue.process_batch = [this] (auto & batch) { @@ -105,9 +105,15 @@ void nano::vote_generator::process_batch (std::deque & batch) { std::deque verified; - auto verify_batch = [this, &verified] (auto && transaction_variant, auto && batch) { + auto refresh_if_needed = [] (auto && transaction_variant) { + std::visit ([&] (auto && transaction) { transaction.refresh_if_needed (); }, transaction_variant); + }; + + auto verify_batch = [this, &verified, &refresh_if_needed] (auto && transaction_variant, auto && batch) { for (auto & [root, hash] : batch) { + refresh_if_needed (transaction_variant); + if (should_vote (transaction_variant, root, hash)) { verified.emplace_back (root, hash); @@ -117,8 +123,7 @@ void nano::vote_generator::process_batch (std::deque & batch) if (is_final) { - auto guard = ledger.store.write_queue.wait (nano::store::writer::voting_final); - transaction_variant_t transaction_variant{ ledger.tx_begin_write ({ tables::final_votes }) }; + transaction_variant_t transaction_variant{ ledger.tx_begin_write ({ tables::final_votes }, nano::store::writer::voting_final) }; verify_batch (transaction_variant, batch); diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index 481be0181e..f42f02aac2 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -732,9 +732,11 @@ nano::ledger::~ledger () { } -auto nano::ledger::tx_begin_write (std::vector const & tables_to_lock, std::vector const & tables_no_lock) const -> secure::write_transaction +auto nano::ledger::tx_begin_write (std::vector const & tables_to_lock, nano::store::writer guard_type) const -> secure::write_transaction { - return secure::write_transaction{ store.tx_begin_write (tables_to_lock, tables_no_lock) }; + auto guard = store.write_queue.wait (guard_type); + auto txn = store.tx_begin_write (tables_to_lock); + return secure::write_transaction{ std::move (txn), std::move (guard) }; } auto nano::ledger::tx_begin_read () const -> secure::read_transaction diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index 3089fa3c22..3e13256d29 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -39,7 +39,7 @@ class ledger final ~ledger (); /** Start read-write transaction */ - secure::write_transaction tx_begin_write (std::vector const & tables_to_lock = {}, std::vector const & tables_no_lock = {}) const; + secure::write_transaction tx_begin_write (std::vector const & tables_to_lock = {}, nano::store::writer guard_type = nano::store::writer::generic) const; /** Start read-only transaction */ secure::read_transaction tx_begin_read () const; diff --git a/nano/secure/transaction.hpp b/nano/secure/transaction.hpp index 619fdb37b4..5dd5e36d83 100644 --- a/nano/secure/transaction.hpp +++ b/nano/secure/transaction.hpp @@ -1,8 +1,9 @@ #pragma once -#include // Correct include for nano::store transaction classes +#include +#include -#include // For std::move +#include namespace nano::secure { @@ -31,11 +32,15 @@ class transaction class write_transaction : public transaction { nano::store::write_transaction txn; + nano::store::write_guard guard; + std::chrono::steady_clock::time_point start; public: - explicit write_transaction (nano::store::write_transaction && t) noexcept : - txn (std::move (t)) + explicit write_transaction (nano::store::write_transaction && txn, nano::store::write_guard && guard) noexcept : + txn{ std::move (txn) }, + guard{ std::move (guard) } { + start = std::chrono::steady_clock::now (); } // Override to return a reference to the encapsulated write_transaction @@ -47,16 +52,31 @@ class write_transaction : public transaction void commit () { txn.commit (); + guard.release (); } void renew () { + guard.renew (); txn.renew (); + start = std::chrono::steady_clock::now (); } void refresh () { - txn.refresh (); + commit (); + renew (); + } + + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) + { + auto now = std::chrono::steady_clock::now (); + if (now - start > max_age) + { + refresh (); + return true; + } + return false; } // Conversion operator to const nano::store::transaction& @@ -78,7 +98,7 @@ class read_transaction : public transaction public: explicit read_transaction (nano::store::read_transaction && t) noexcept : - txn (std::move (t)) + txn{ std::move (t) } { } diff --git a/nano/store/component.hpp b/nano/store/component.hpp index 7cdd7d1fff..a64ae1a60f 100644 --- a/nano/store/component.hpp +++ b/nano/store/component.hpp @@ -81,8 +81,10 @@ namespace store store::final_vote & final_vote; store::version & version; + public: // TODO: Shouldn't be public store::write_queue write_queue; + public: virtual unsigned max_block_write_batch_num () const = 0; virtual bool copy_db (std::filesystem::path const & destination) = 0; diff --git a/nano/store/write_queue.cpp b/nano/store/write_queue.cpp index b50809a32c..154a62cee6 100644 --- a/nano/store/write_queue.cpp +++ b/nano/store/write_queue.cpp @@ -4,34 +4,30 @@ #include -nano::store::write_guard::write_guard (std::function guard_finish_callback_a) : - guard_finish_callback (guard_finish_callback_a) -{ -} +/* + * write_guard + */ -nano::store::write_guard::write_guard (write_guard && write_guard_a) noexcept : - guard_finish_callback (std::move (write_guard_a.guard_finish_callback)), - owns (write_guard_a.owns) +nano::store::write_guard::write_guard (write_queue & queue, writer type) : + queue{ queue }, + type{ type } { - write_guard_a.owns = false; - write_guard_a.guard_finish_callback = nullptr; + renew (); } -nano::store::write_guard & nano::store::write_guard::operator= (write_guard && write_guard_a) noexcept +nano::store::write_guard::write_guard (write_guard && other) noexcept : + queue{ other.queue }, + type{ other.type }, + owns{ other.owns } { - owns = write_guard_a.owns; - guard_finish_callback = std::move (write_guard_a.guard_finish_callback); - - write_guard_a.owns = false; - write_guard_a.guard_finish_callback = nullptr; - return *this; + other.owns = false; } nano::store::write_guard::~write_guard () { if (owns) { - guard_finish_callback (); + release (); } } @@ -42,84 +38,82 @@ bool nano::store::write_guard::is_owned () const void nano::store::write_guard::release () { - debug_assert (owns); - if (owns) - { - guard_finish_callback (); - } + release_assert (owns); + queue.release (type); owns = false; } +void nano::store::write_guard::renew () +{ + release_assert (!owns); + queue.acquire (type); + owns = true; +} + +/* + * write_queue + */ + nano::store::write_queue::write_queue (bool use_noops_a) : - guard_finish_callback ([use_noops_a, &queue = queue, &mutex = mutex, &cv = cv] () { - if (!use_noops_a) - { - { - nano::lock_guard guard (mutex); - queue.pop_front (); - } - cv.notify_all (); - } - }), - use_noops (use_noops_a) + use_noops{ use_noops_a } { } nano::store::write_guard nano::store::write_queue::wait (writer writer) { - if (use_noops) - { - return write_guard ([] {}); - } - - nano::unique_lock lk (mutex); - debug_assert (std::none_of (queue.cbegin (), queue.cend (), [writer] (auto const & item) { return item == writer; })); - // Add writer to the end of the queue if it's not already waiting - auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); - if (!exists) - { - queue.push_back (writer); - } - cv.wait (lk, [&] () { return queue.front () == writer; }); - return write_guard (guard_finish_callback); + return write_guard{ *this, writer }; } -bool nano::store::write_queue::contains (writer writer) +bool nano::store::write_queue::contains (writer writer) const { debug_assert (!use_noops); - nano::lock_guard guard (mutex); + nano::lock_guard guard{ mutex }; return std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); } -bool nano::store::write_queue::process (writer writer) +void nano::store::write_queue::pop () { - if (use_noops) + nano::lock_guard guard{ mutex }; + if (!queue.empty ()) { - return true; + queue.pop_front (); } + condition.notify_all (); +} - auto result = false; +void nano::store::write_queue::acquire (writer writer) +{ + if (use_noops) { - nano::lock_guard guard (mutex); - // Add writer to the end of the queue if it's not already waiting - auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); - if (!exists) - { - queue.push_back (writer); - } - - result = (queue.front () == writer); + return; // Pass immediately } - if (!result) + nano::unique_lock lock{ mutex }; + + // There should be no duplicates in the queue + debug_assert (std::none_of (queue.cbegin (), queue.cend (), [writer] (auto const & item) { return item == writer; })); + + // Add writer to the end of the queue if it's not already waiting + auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); + if (!exists) { - cv.notify_all (); + queue.push_back (writer); } - return result; + condition.wait (lock, [&] () { return queue.front () == writer; }); } -nano::store::write_guard nano::store::write_queue::pop () +void nano::store::write_queue::release (writer writer) { - return write_guard (guard_finish_callback); -} + if (use_noops) + { + return; // Pass immediately + } + { + nano::lock_guard guard{ mutex }; + release_assert (!queue.empty ()); + release_assert (queue.front () == writer); + queue.pop_front (); + } + condition.notify_all (); +} \ No newline at end of file diff --git a/nano/store/write_queue.hpp b/nano/store/write_queue.hpp index 8875b2db35..e4f1fdc69a 100644 --- a/nano/store/write_queue.hpp +++ b/nano/store/write_queue.hpp @@ -11,28 +11,38 @@ namespace nano::store /** Distinct areas write locking is done, order is irrelevant */ enum class writer { + generic, + node, + blockprocessor, confirmation_height, - process_batch, pruning, voting_final, testing // Used in tests to emulate a write lock }; +class write_queue; + class write_guard final { public: - explicit write_guard (std::function guard_finish_callback_a); - void release (); + explicit write_guard (write_queue & queue, writer type); ~write_guard (); + write_guard (write_guard const &) = delete; write_guard & operator= (write_guard const &) = delete; write_guard (write_guard &&) noexcept; - write_guard & operator= (write_guard &&) noexcept; + write_guard & operator= (write_guard &&) noexcept = delete; + + void release (); + void renew (); + bool is_owned () const; + writer const type; + private: - std::function guard_finish_callback; - bool owns{ true }; + write_queue & queue; + bool owns{ false }; }; /** @@ -41,25 +51,31 @@ class write_guard final */ class write_queue final { + friend class write_guard; + public: - explicit write_queue (bool use_noops_a); + explicit write_queue (bool use_noops); + /** Blocks until we are at the head of the queue and blocks other waiters until write_guard goes out of scope */ [[nodiscard ("write_guard blocks other waiters")]] write_guard wait (writer writer); - /** Returns true if this writer is now at the front of the queue */ - bool process (writer writer); - /** Returns true if this writer is anywhere in the queue. Currently only used in tests */ - bool contains (writer writer); + bool contains (writer writer) const; /** Doesn't actually pop anything until the returned write_guard is out of scope */ - write_guard pop (); + void pop (); private: + void acquire (writer writer); + void release (writer writer); + +private: + bool const use_noops; + std::deque queue; - nano::mutex mutex; - nano::condition_variable cv; + mutable nano::mutex mutex; + nano::condition_variable condition; + std::function guard_finish_callback; - bool use_noops; }; } // namespace nano::store