-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathsubscriptionstore.h
193 lines (154 loc) · 8.92 KB
/
subscriptionstore.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*
This file is part of FlashMQ (https://www.flashmq.org)
Copyright (C) 2021-2023 Wiebe Cazemier
FlashMQ is free software: you can redistribute it and/or modify
it under the terms of The Open Software License 3.0 (OSL-3.0).
See LICENSE for license details.
*/
#ifndef SUBSCRIPTIONSTORE_H
#define SUBSCRIPTIONSTORE_H
#include <unordered_map>
#include <forward_list>
#include <list>
#include <mutex>
#include <map>
#include <vector>
#include <pthread.h>
#include <optional>
#include "client.h"
#include "session.h"
#include "retainedmessage.h"
#include "logger.h"
#include "subscription.h"
#include "sharedsubscribers.h"
struct ReceivingSubscriber
{
const std::shared_ptr<Session> session;
const uint8_t qos;
const bool retainAsPublished;
public:
ReceivingSubscriber(const std::shared_ptr<Session> &ses, uint8_t qos, bool retainAsPublished);
};
class SubscriptionNode
{
std::unordered_map<std::string, Subscription> subscribers;
std::unordered_map<std::string, SharedSubscribers> sharedSubscribers;
public:
SubscriptionNode();
SubscriptionNode(const SubscriptionNode &node) = delete;
SubscriptionNode(SubscriptionNode &&node) = delete;
const std::unordered_map<std::string, Subscription> &getSubscribers() const;
std::unordered_map<std::string, SharedSubscribers> &getSharedSubscribers();
void addSubscriber(const std::shared_ptr<Session> &subscriber, uint8_t qos, bool noLocal, bool retainAsPublished, const std::string &shareName);
void removeSubscriber(const std::shared_ptr<Session> &subscriber, const std::string &shareName);
std::unordered_map<std::string, std::shared_ptr<SubscriptionNode>> children;
std::shared_ptr<SubscriptionNode> childrenPlus;
std::shared_ptr<SubscriptionNode> childrenPound;
SubscriptionNode *getChildren(const std::string &subtopic) const;
int cleanSubscriptions(std::deque<std::weak_ptr<SubscriptionNode>> &defferedLeafs);
bool empty() const;
};
class RetainedMessageNode
{
friend class SubscriptionStore;
std::unordered_map<std::string, std::shared_ptr<RetainedMessageNode>> children;
std::mutex messageSetMutex;
std::unique_ptr<RetainedMessage> message;
void addPayload(const Publish &publish, int64_t &totalCount);
std::shared_ptr<RetainedMessageNode> getChildren(const std::string &subtopic) const;
bool isOrphaned() const;
};
class QueuedWill
{
std::weak_ptr<WillPublish> will;
std::weak_ptr<Session> session;
public:
QueuedWill(const std::shared_ptr<WillPublish> &will, const std::shared_ptr<Session> &session);
const std::weak_ptr<WillPublish> &getWill() const;
std::shared_ptr<Session> getSession();
};
struct DeferredRetainedMessageNodeDelivery
{
std::weak_ptr<RetainedMessageNode> node;
std::vector<std::string>::const_iterator cur;
std::vector<std::string>::const_iterator end;
bool poundMode = false;
};
class SubscriptionStore
{
#ifdef TESTING
friend class MainTests;
#endif
SubscriptionNode root;
SubscriptionNode rootDollar;
pthread_rwlock_t sessionsAndSubscriptionsRwlock = PTHREAD_RWLOCK_INITIALIZER;
std::unordered_map<std::string, std::shared_ptr<Session>> sessionsById;
const std::unordered_map<std::string, std::shared_ptr<Session>> &sessionsByIdConst;
std::mutex queuedSessionRemovalsMutex;
std::map<std::chrono::seconds, std::vector<std::weak_ptr<Session>>> queuedSessionRemovals;
pthread_rwlock_t retainedMessagesRwlock = PTHREAD_RWLOCK_INITIALIZER;
std::deque<std::weak_ptr<RetainedMessageNode>> deferredRetainedMessageNodeToPurge;
const std::shared_ptr<RetainedMessageNode> retainedMessagesRoot = std::make_shared<RetainedMessageNode>();
const std::shared_ptr<RetainedMessageNode> retainedMessagesRootDollar = std::make_shared<RetainedMessageNode>();
int64_t retainedMessageCount = 0;
int64_t subscriptionCount = 0;
std::chrono::time_point<std::chrono::steady_clock> lastSubscriptionCountRefreshedAt;
std::mutex pendingWillsMutex;
std::map<std::chrono::seconds, std::vector<QueuedWill>> pendingWillMessages;
std::deque<std::weak_ptr<SubscriptionNode>> deferredSubscriptionLeafsForPurging;
Logger *logger = Logger::getInstance();
static void publishNonRecursively(SubscriptionNode *this_node,
std::forward_list<ReceivingSubscriber> &targetSessions, size_t distributionHash, const std::string &senderClientId);
static void publishRecursively(std::vector<std::string>::const_iterator cur_subtopic_it, std::vector<std::string>::const_iterator end,
SubscriptionNode *this_node, std::forward_list<ReceivingSubscriber> &targetSessions, size_t distributionHash, const std::string &senderClientId);
static void giveClientRetainedMessagesRecursively(std::vector<std::string>::const_iterator cur_subtopic_it,
std::vector<std::string>::const_iterator end, const std::shared_ptr<RetainedMessageNode> &this_node, bool poundMode,
const std::shared_ptr<Session> &session, const uint8_t max_qos,
const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<DeferredRetainedMessageNodeDelivery> &deferred,
int &drop_count, int &processed_nodes_count);
void getRetainedMessages(RetainedMessageNode *this_node, std::vector<RetainedMessage> &outputList,
const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<std::weak_ptr<RetainedMessageNode>> &deferred) const;
void getSubscriptions(SubscriptionNode *this_node, const std::string &composedTopic, bool root,
std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const;
void countSubscriptions(SubscriptionNode *this_node, int64_t &count) const;
void expireRetainedMessages(RetainedMessageNode *this_node, const std::chrono::time_point<std::chrono::steady_clock> &limit,
std::deque<std::weak_ptr<RetainedMessageNode>> &deferred);
SubscriptionNode *getDeepestNode(const std::vector<std::string> &subtopics);
public:
SubscriptionStore();
void addSubscription(std::shared_ptr<Client> &client, const std::vector<std::string> &subtopics, uint8_t qos, bool noLocal, bool retainAsPublished);
void addSubscription(std::shared_ptr<Client> &client, const std::vector<std::string> &subtopics, uint8_t qos, bool noLocal, bool retainAsPublished,
const std::string &shareName, AuthResult authResult);
void removeSubscription(std::shared_ptr<Client> &client, const std::string &topic);
std::shared_ptr<Session> getBridgeSession(std::shared_ptr<Client> &client);
void registerClientAndKickExistingOne(std::shared_ptr<Client> &client);
void registerClientAndKickExistingOne(std::shared_ptr<Client> &client, bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval);
std::shared_ptr<Session> lockSession(const std::string &clientid);
void sendQueuedWillMessages();
void queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::string &senderClientId, const std::shared_ptr<Session> &session, bool forceNow = false);
void queuePacketAtSubscribers(PublishCopyFactory ©Factory, const std::string &senderClientId, bool dollar = false);
void giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
const std::vector<std::string> &subscribeSubtopics, uint8_t max_qos);
void giveClientRetainedMessagesInitiateDeferred(const std::weak_ptr<Session> ses,
const std::shared_ptr<const std::vector<std::string>> subscribeSubtopicsCopy,
std::shared_ptr<std::deque<DeferredRetainedMessageNodeDelivery>> deferred,
int &requeue_count, uint &total_node_count, uint8_t max_qos);
void setRetainedMessage(const Publish &publish, const std::vector<std::string> &subtopics);
void removeSession(const std::shared_ptr<Session> &session);
void removeExpiredSessionsClients();
bool hasDeferredSubscriptionTreeNodesForPurging();
bool purgeSubscriptionTree();
bool hasDeferredRetainedMessageNodesForPurging();
bool expireRetainedMessages();
int64_t getRetainedMessageCount() const;
uint64_t getSessionCount() const;
int64_t getSubscriptionCount();
void saveRetainedMessages(const std::string &filePath, bool sleep_after_limit);
void loadRetainedMessages(const std::string &filePath);
void saveSessionsAndSubscriptions(const std::string &filePath);
void loadSessionsAndSubscriptions(const std::string &filePath);
void queueSessionRemoval(const std::shared_ptr<Session> &session);
};
#endif // SUBSCRIPTIONSTORE_H