Skip to content

Commit

Permalink
Fix deleting session with the same ID on reconnect
Browse files Browse the repository at this point in the history
Reconnecting with the same client ID that still exists on the server
results disconnection and deletion of the old client, and it erroneously
queued a deletion of the session with that ID, resulting in deleting the
freshly created session that replaces the old one.

Fixed by clearing the session of the client that's being disconnected.

This fixes losing subscriptions in these scenarios.

Small note about session destructor logging: previously this was removed
because it caused this line to be printed on copied sessions on saving
them to disk. But, we don't copy anymore, so now we can log again.
  • Loading branch information
halfgaar committed Nov 7, 2023
1 parent 55c50aa commit f43245e
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 13 deletions.
61 changes: 61 additions & 0 deletions FlashMQTests/tst_maintests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,67 @@ void MainTests::testBasicsWithFlashMQTestClient()

}

void MainTests::testDontRemoveSessionGivenToNewClientWithSameId()
{
FlashMQTestClient receiver;
receiver.start();
receiver.connectClient(ProtocolVersion::Mqtt311, true, 60, [] (Connect &c) {
c.clientid = "Sandra-nonrandom";
});
receiver.subscribe("just/a/path", 0);

FlashMQTestClient sender;
sender.start();
sender.connectClient(ProtocolVersion::Mqtt5);

{
Publish pub("just/a/path", "AAAAA", 0);
pub.constructPropertyBuilder();
pub.propertyBuilder->writeTopicAlias(1);
sender.publish(pub);
}

{
receiver.waitForMessageCount(1);

const MqttPacket &pack1 = receiver.receivedPublishes.at(0);

QCOMPARE(pack1.getTopic(), "just/a/path");
QCOMPARE(pack1.getPayloadCopy(), "AAAAA");
}

FlashMQTestClient receiver2;
receiver2.start();
receiver2.connectClient(ProtocolVersion::Mqtt311, true, 60, [] (Connect &c) {
c.clientid = "Sandra-nonrandom";
});
receiver2.subscribe("just/a/path", 0);

{
Publish pub("just/a/path", "AAAAA", 0);
pub.constructPropertyBuilder();
pub.propertyBuilder->writeTopicAlias(1);
sender.publish(pub);
}

{
try
{
receiver2.waitForMessageCount(1);
}
catch(std::exception &ex)
{
QFAIL("The second subscriber did not get the message, so the subscription failed.");
}

const MqttPacket &pack1 = receiver2.receivedPublishes.at(0);

QCOMPARE(pack1.getTopic(), "just/a/path");
QCOMPARE(pack1.getPayloadCopy(), "AAAAA");
}

}

void MainTests::testIncomingTopicAlias()
{
FlashMQTestClient receiver;
Expand Down
2 changes: 2 additions & 0 deletions FlashMQTests/tst_maintests.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ private slots:

void testBasicsWithFlashMQTestClient();

void testDontRemoveSessionGivenToNewClientWithSameId();

/**
* Will tests.
*/
Expand Down
7 changes: 6 additions & 1 deletion client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ void Client::sendOrQueueWill()
return;

std::shared_ptr<SubscriptionStore> store = MainApp::getMainApp()->getSubscriptionStore();
store->queueWillMessage(willPublish, session);
store->queueWillMessage(willPublish, clientid, session);
this->willPublish.reset();
}

Expand Down Expand Up @@ -905,6 +905,11 @@ std::shared_ptr<Session> Client::getSession()
return this->session;
}

void Client::resetSession()
{
this->session.reset();
}

void Client::setDisconnectReason(const std::string &reason)
{
if (!this->disconnectReason.empty())
Expand Down
1 change: 1 addition & 0 deletions client.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class Client
const std::shared_ptr<WillPublish> &getStagedWill() { return this->stagedWillPublish; }
void assignSession(std::shared_ptr<Session> &session);
std::shared_ptr<Session> getSession();
void resetSession();
void setDisconnectReason(const std::string &reason);
std::chrono::seconds getSecondsTillKeepAliveAction() const;

Expand Down
2 changes: 1 addition & 1 deletion session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ uint16_t Session::getNextPacketId()

Session::~Session()
{

logger->log(LOG_DEBUG) << "Session destructor of session with client ID '" << this->client_id << "'.";
}

/**
Expand Down
16 changes: 7 additions & 9 deletions subscriptionstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr<Client>
if (clientOfOtherSession)
{
logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", clientOfOtherSession->getClientId().c_str());
clientOfOtherSession->resetSession();
clientOfOtherSession->setDisconnectReason("Another client with this ID connected");
clientOfOtherSession->serverInitiatedDisconnect(ReasonCodes::SessionTakenOver);
}
Expand Down Expand Up @@ -423,7 +424,7 @@ void SubscriptionStore::sendQueuedWillMessages()
* The queued will is only valid for that time. Should a new will be placed in the map for a session, the original shared_ptr
* will be cleared and the previously queued entry is void (but still there, so it needs to be checked).
*/
void SubscriptionStore::queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow)
void SubscriptionStore::queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::string &senderClientId, const std::shared_ptr<Session> &session, bool forceNow)
{
if (!willMessage)
return;
Expand All @@ -438,11 +439,6 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr<WillPublish> &wil
{
if (settings->willsEnabled && auth.aclCheck(*willMessage, willMessage->payload) == AuthResult::success)
{
std::string senderClientId;

if (session)
senderClientId = session->getClientId();

PublishCopyFactory factory(willMessage.get());
queuePacketAtSubscribers(factory, senderClientId);

Expand All @@ -451,7 +447,9 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr<WillPublish> &wil
}

// Avoid sending two immediate wills when a session is destroyed with the client disconnect.
if (session) // session is null when you're destroying a client before a session is assigned.
// Session is null when you're destroying a client before a session is assigned, or
// when an old client has no session anymore after a client with the same ID connects.
if (session)
session->clearWill();

return;
Expand Down Expand Up @@ -887,7 +885,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr<Session> &session)
std::shared_ptr<WillPublish> &will = session->getWill();
if (will)
{
queueWillMessage(will, session, true);
queueWillMessage(will, clientid, session, true);
}

std::list<std::shared_ptr<Session>> sessionsToRemove;
Expand Down Expand Up @@ -1277,7 +1275,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath
{
sessionsById[session->getClientId()] = session;
queueSessionRemoval(session);
queueWillMessage(session->getWill(), session);
queueWillMessage(session->getWill(), session->getClientId(), session);
}

for (auto &pair : loadedData.subscriptions)
Expand Down
2 changes: 1 addition & 1 deletion subscriptionstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class SubscriptionStore
std::shared_ptr<Session> lockSession(const std::string &clientid);

void sendQueuedWillMessages();
void queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::shared_ptr<Session> &session, bool forceNow = false);
void queueWillMessage(const std::shared_ptr<WillPublish> &willMessage, const std::string &senderClientId, const std::shared_ptr<Session> &session, bool forceNow = false);
void queuePacketAtSubscribers(PublishCopyFactory &copyFactory, const std::string &senderClientId, bool dollar = false);
void giveClientRetainedMessages(const std::shared_ptr<Session> &ses,
const std::vector<std::string> &subscribeSubtopics, uint8_t max_qos);
Expand Down
2 changes: 1 addition & 1 deletion threaddata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ void ThreadData::clientDisconnectActions(bool authenticated, const std::string &

if (willPublish)
{
store->queueWillMessage(willPublish, session);
store->queueWillMessage(willPublish, clientid, session);
}

if (session && session->getDestroyOnDisconnect())
Expand Down

0 comments on commit f43245e

Please sign in to comment.