Skip to content

Commit

Permalink
Yield lock while collecting retained messages to save
Browse files Browse the repository at this point in the history
This fixes stalls while it's collecting retained messages, at the cost
of the snapshot not being atomic. But, there is no real need to be
atomic while running.
  • Loading branch information
halfgaar committed Jan 1, 2024
1 parent a63778c commit 517d19e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 16 deletions.
6 changes: 5 additions & 1 deletion FlashMQTests/retaintests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,11 @@ void MainTests::test_retained_tree_purging()
store->expireRetainedMessages();

std::vector<RetainedMessage> list;
store->getRetainedMessages(store->retainedMessagesRoot.get(), list);
const std::chrono::time_point<std::chrono::steady_clock> limit = std::chrono::steady_clock::now() + std::chrono::milliseconds(1000);
std::deque<std::weak_ptr<RetainedMessageNode>> deferred;
store->getRetainedMessages(store->retainedMessagesRoot.get(), list, limit, deferred);

QVERIFY(deferred.empty());

QVERIFY(std::none_of(list.begin(), list.end(), [](RetainedMessage &rm) {
return rm.publish.payload == "willexpire";
Expand Down
8 changes: 4 additions & 4 deletions mainapp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ void MainApp::saveStateInThread()
{
std::list<BridgeInfoForSerializing> bridgeInfos = BridgeInfoForSerializing::getBridgeInfosForSerializing(this->bridges);

auto f = std::bind(&MainApp::saveState, this->settings, bridgeInfos);
auto f = std::bind(&MainApp::saveState, this->settings, bridgeInfos, true);
this->bgWorker.addTask(f);


Expand Down Expand Up @@ -360,7 +360,7 @@ void MainApp::queueBridgeReconnectAllThreads(bool alsoQueueNexts)
* @param settings A local settings, copied from a std::bind copy when running in a thread, because of thread safety.
* @param bridgeInfos is a list of objects already prepared from the original bridge configs, to avoid concurrent access.
*/
void MainApp::saveState(const Settings &settings, const std::list<BridgeInfoForSerializing> &bridgeInfos)
void MainApp::saveState(const Settings &settings, const std::list<BridgeInfoForSerializing> &bridgeInfos, bool sleep_after_limit)
{
Logger *logger = Logger::getInstance();

Expand All @@ -372,7 +372,7 @@ void MainApp::saveState(const Settings &settings, const std::list<BridgeInfoForS

const std::string retainedDBPath = settings.getRetainedMessagesDBFile();
if (settings.retainedMessagesMode == RetainedMessagesMode::Enabled)
subscriptionStore->saveRetainedMessages(retainedDBPath);
subscriptionStore->saveRetainedMessages(retainedDBPath, sleep_after_limit);
else
logger->logf(LOG_INFO, "Not saving '%s', because 'retained_messages_mode' is not 'enabled'.", retainedDBPath.c_str());

Expand Down Expand Up @@ -845,7 +845,7 @@ void MainApp::start()
this->bgWorker.waitForStop();

std::list<BridgeInfoForSerializing> bridgeInfos = BridgeInfoForSerializing::getBridgeInfosForSerializing(this->bridges);
saveState(this->settings, bridgeInfos);
saveState(this->settings, bridgeInfos, false);
}

void MainApp::queueQuit()
Expand Down
2 changes: 1 addition & 1 deletion mainapp.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class MainApp
void queuepluginPeriodicEventAllThreads();
void setFuzzFile(const std::string &fuzzFilePath);
void queuePublishStatsOnDollarTopic();
static void saveState(const Settings &settings, const std::list<BridgeInfoForSerializing> &bridgeInfos);
static void saveState(const Settings &settings, const std::list<BridgeInfoForSerializing> &bridgeInfos, bool sleep_after_limit);
static void saveBridgeInfo(const std::string &filePath, const std::list<BridgeInfoForSerializing> &bridgeInfos);
void loadBridgeInfo();
void saveStateInThread();
Expand Down
43 changes: 35 additions & 8 deletions subscriptionstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ See LICENSE for license details.
#include "plugin.h"
#include "exceptions.h"
#include "threaddata.h"
#include <deque>

ReceivingSubscriber::ReceivingSubscriber(const std::shared_ptr<Session> &ses, uint8_t qos, bool retainAsPublished) :
session(ses),
Expand Down Expand Up @@ -1273,7 +1274,9 @@ int64_t SubscriptionStore::getSubscriptionCount()
*/
}

void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const
void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList,
const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<std::weak_ptr<RetainedMessageNode>> &deferred) const
{
{
std::lock_guard<std::mutex> locker(this_node->messageSetMutex);
Expand All @@ -1284,7 +1287,11 @@ void SubscriptionStore::getRetainedMessages(RetainedMessageNode *this_node, std:
for(auto &pair : this_node->children)
{
const std::shared_ptr<RetainedMessageNode> &child = pair.second;
getRetainedMessages(child.get(), outputList);

if (std::chrono::steady_clock::now() > limit)
deferred.push_back(child);
else
getRetainedMessages(child.get(), outputList, limit, deferred);
}
}

Expand Down Expand Up @@ -1398,20 +1405,40 @@ void SubscriptionStore::expireRetainedMessages(RetainedMessageNode *this_node, c
}
}

void SubscriptionStore::saveRetainedMessages(const std::string &filePath)
void SubscriptionStore::saveRetainedMessages(const std::string &filePath, bool sleep_after_limit)
{
logger->logf(LOG_INFO, "Saving retained messages to '%s'", filePath.c_str());

std::vector<RetainedMessage> result;
int64_t reserve = std::max<int64_t>(retainedMessageCount, 0);
reserve = std::min<int64_t>(reserve, 1000000);
result.reserve(reserve);

std::deque<std::weak_ptr<RetainedMessageNode>> deferred;

deferred.push_back(retainedMessagesRoot);

for (; !deferred.empty(); deferred.pop_front())
{
RWLockGuard locker(&retainedMessagesRwlock);
locker.rdlock();
result.reserve(std::max<int64_t>(retainedMessageCount, 0));
getRetainedMessages(retainedMessagesRoot.get(), result);
{
RWLockGuard locker(&retainedMessagesRwlock);
locker.rdlock();

std::shared_ptr<RetainedMessageNode> node = deferred.front().lock();

if (!node)
continue;

const std::chrono::time_point<std::chrono::steady_clock> limit = std::chrono::steady_clock::now() + std::chrono::milliseconds(5);
getRetainedMessages(node.get(), result, limit, deferred);
}

// Because we only do this operation in background threads or on exit, we don't have to requeue, so can just sleep.
if (sleep_after_limit && !deferred.empty())
std::this_thread::sleep_for(std::chrono::microseconds(100));
}

logger->logf(LOG_DEBUG, "Collected %zu retained messages to save.", result.size());
logger->log(LOG_DEBUG) << "Collected " << result.size() << " retained messages to save.";

// Then do the IO without locking the threads.
RetainedMessagesDB db(filePath);
Expand Down
6 changes: 4 additions & 2 deletions subscriptionstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ class SubscriptionStore
const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<DeferredRetainedMessageNodeDelivery> &deferred,
int &drop_count, int &processed_nodes_count);
void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList) const;
void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList,
const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<std::weak_ptr<RetainedMessageNode>> &deferred) const;
void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root,
std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const;
void countSubscriptions(SubscriptionNode *this_node, int64_t &count) const;
Expand Down Expand Up @@ -178,7 +180,7 @@ class SubscriptionStore
uint64_t getSessionCount() const;
int64_t getSubscriptionCount();

void saveRetainedMessages(const std::string &filePath);
void saveRetainedMessages(const std::string &filePath, bool sleep_after_limit);
void loadRetainedMessages(const std::string &filePath);

void saveSessionsAndSubscriptions(const std::string &filePath);
Expand Down

0 comments on commit 517d19e

Please sign in to comment.