-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathsharedsubscribers.cpp
140 lines (114 loc) · 3.37 KB
/
sharedsubscribers.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
This file is part of FlashMQ (https://www.flashmq.org)
Copyright (C) 2021-2023 Wiebe Cazemier
FlashMQ is free software: you can redistribute it and/or modify
it under the terms of The Open Software License 3.0 (OSL-3.0).
See LICENSE for license details.
*/
#include "sharedsubscribers.h"
#include <cassert>
SharedSubscribers::SharedSubscribers() noexcept
{
}
void SharedSubscribers::setName(const std::string &name)
{
if (!shareName.empty() || name.empty())
return;
this->shareName = name;
}
/**
* @brief SharedSubscribers::operator [] access or create a shared subscription in a shared subscription.
* @param clientid
* @return
*
* Note that the reference returned will likely be invalidated when you call it again, so don't keep lingering references around.
*/
Subscription &SharedSubscribers::operator[](const std::string &clientid)
{
auto index_pos = index.find(clientid);
if (index_pos != index.end())
{
const int index = index_pos->second;
assert(index < static_cast<int>(members.size()));
return members[index];
}
const int newIndex = members.size();
index[clientid] = newIndex;
members.emplace_back();
Subscription &r = members.back();
return r;
}
const Subscription *SharedSubscribers::getNext()
{
const Subscription *result = nullptr;
for (size_t i = 0; i < members.size(); i++)
{
// This counter use is not thread safe / atomic, but it doesn't matter much.
const Subscription &s = members[roundRobinCounter++ % members.size()];
if (!s.session.expired())
{
result = &s;
break;
}
}
return result;
}
const Subscription *SharedSubscribers::getNext(size_t hash) const
{
const Subscription *result = nullptr;
size_t pos = hash % members.size();
for (size_t i = 0; i < members.size(); i++)
{
const Subscription &s = members[pos++ % members.size()];
if (!s.session.expired())
{
result = &s;
break;
}
}
return result;
}
void SharedSubscribers::erase(const std::string &clientid)
{
auto index_pos = index.find(clientid);
if (index_pos != index.end())
{
const int index = index_pos->second;
assert(index < static_cast<int>(members.size()));
members[index].reset();
}
}
void SharedSubscribers::purgeAndReIndex()
{
int i = 0;
std::vector<Subscription> newMembers;
std::unordered_map<std::string, int> newIndex;
for (auto &pair : index)
{
const int index = pair.second;
Subscription &sub = members[index];
if (sub.session.expired())
continue;
newMembers.push_back(sub);
newIndex[pair.first] = i;
i++;
}
this->members = std::move(newMembers);
this->index = std::move(newIndex);
}
bool SharedSubscribers::empty() const
{
return members.empty();
}
void SharedSubscribers::getForSerializing(const std::string &topic, std::unordered_map<std::string, std::list<SubscriptionForSerializing>> &outputList) const
{
for (const Subscription &member : members)
{
std::shared_ptr<Session> ses = member.session.lock();
if (ses)
{
SubscriptionForSerializing sub(ses->getClientId(), member.qos, member.noLocal, member.retainAsPublished, this->shareName);
outputList[topic].push_back(sub);
}
}
}