From f43245ef851055c19435d6745d37fc2bb6c08a1b Mon Sep 17 00:00:00 2001 From: Wiebe Cazemier Date: Tue, 7 Nov 2023 12:34:04 +0100 Subject: [PATCH] Fix deleting session with the same ID on reconnect Reconnecting with the same client ID that still exists on the server results disconnection and deletion of the old client, and it erroneously queued a deletion of the session with that ID, resulting in deleting the freshly created session that replaces the old one. Fixed by clearing the session of the client that's being disconnected. This fixes losing subscriptions in these scenarios. Small note about session destructor logging: previously this was removed because it caused this line to be printed on copied sessions on saving them to disk. But, we don't copy anymore, so now we can log again. --- FlashMQTests/tst_maintests.cpp | 61 ++++++++++++++++++++++++++++++++++ FlashMQTests/tst_maintests.h | 2 ++ client.cpp | 7 +++- client.h | 1 + session.cpp | 2 +- subscriptionstore.cpp | 16 ++++----- subscriptionstore.h | 2 +- threaddata.cpp | 2 +- 8 files changed, 80 insertions(+), 13 deletions(-) diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index ce206fe0..c9e11df3 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -1449,6 +1449,67 @@ void MainTests::testBasicsWithFlashMQTestClient() } +void MainTests::testDontRemoveSessionGivenToNewClientWithSameId() +{ + FlashMQTestClient receiver; + receiver.start(); + receiver.connectClient(ProtocolVersion::Mqtt311, true, 60, [] (Connect &c) { + c.clientid = "Sandra-nonrandom"; + }); + receiver.subscribe("just/a/path", 0); + + FlashMQTestClient sender; + sender.start(); + sender.connectClient(ProtocolVersion::Mqtt5); + + { + Publish pub("just/a/path", "AAAAA", 0); + pub.constructPropertyBuilder(); + pub.propertyBuilder->writeTopicAlias(1); + sender.publish(pub); + } + + { + receiver.waitForMessageCount(1); + + const MqttPacket &pack1 = receiver.receivedPublishes.at(0); + + QCOMPARE(pack1.getTopic(), "just/a/path"); + QCOMPARE(pack1.getPayloadCopy(), "AAAAA"); + } + + FlashMQTestClient receiver2; + receiver2.start(); + receiver2.connectClient(ProtocolVersion::Mqtt311, true, 60, [] (Connect &c) { + c.clientid = "Sandra-nonrandom"; + }); + receiver2.subscribe("just/a/path", 0); + + { + Publish pub("just/a/path", "AAAAA", 0); + pub.constructPropertyBuilder(); + pub.propertyBuilder->writeTopicAlias(1); + sender.publish(pub); + } + + { + try + { + receiver2.waitForMessageCount(1); + } + catch(std::exception &ex) + { + QFAIL("The second subscriber did not get the message, so the subscription failed."); + } + + const MqttPacket &pack1 = receiver2.receivedPublishes.at(0); + + QCOMPARE(pack1.getTopic(), "just/a/path"); + QCOMPARE(pack1.getPayloadCopy(), "AAAAA"); + } + +} + void MainTests::testIncomingTopicAlias() { FlashMQTestClient receiver; diff --git a/FlashMQTests/tst_maintests.h b/FlashMQTests/tst_maintests.h index 54658430..97d73667 100644 --- a/FlashMQTests/tst_maintests.h +++ b/FlashMQTests/tst_maintests.h @@ -126,6 +126,8 @@ private slots: void testBasicsWithFlashMQTestClient(); + void testDontRemoveSessionGivenToNewClientWithSameId(); + /** * Will tests. */ diff --git a/client.cpp b/client.cpp index f96e41ca..a15c2202 100644 --- a/client.cpp +++ b/client.cpp @@ -545,7 +545,7 @@ void Client::sendOrQueueWill() return; std::shared_ptr store = MainApp::getMainApp()->getSubscriptionStore(); - store->queueWillMessage(willPublish, session); + store->queueWillMessage(willPublish, clientid, session); this->willPublish.reset(); } @@ -905,6 +905,11 @@ std::shared_ptr Client::getSession() return this->session; } +void Client::resetSession() +{ + this->session.reset(); +} + void Client::setDisconnectReason(const std::string &reason) { if (!this->disconnectReason.empty()) diff --git a/client.h b/client.h index 204843aa..87e58ae5 100644 --- a/client.h +++ b/client.h @@ -153,6 +153,7 @@ class Client const std::shared_ptr &getStagedWill() { return this->stagedWillPublish; } void assignSession(std::shared_ptr &session); std::shared_ptr getSession(); + void resetSession(); void setDisconnectReason(const std::string &reason); std::chrono::seconds getSecondsTillKeepAliveAction() const; diff --git a/session.cpp b/session.cpp index 765d3b64..f071efc2 100644 --- a/session.cpp +++ b/session.cpp @@ -87,7 +87,7 @@ uint16_t Session::getNextPacketId() Session::~Session() { - + logger->log(LOG_DEBUG) << "Session destructor of session with client ID '" << this->client_id << "'."; } /** diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 85403313..654f91a9 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -315,6 +315,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr if (clientOfOtherSession) { logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", clientOfOtherSession->getClientId().c_str()); + clientOfOtherSession->resetSession(); clientOfOtherSession->setDisconnectReason("Another client with this ID connected"); clientOfOtherSession->serverInitiatedDisconnect(ReasonCodes::SessionTakenOver); } @@ -423,7 +424,7 @@ void SubscriptionStore::sendQueuedWillMessages() * The queued will is only valid for that time. Should a new will be placed in the map for a session, the original shared_ptr * will be cleared and the previously queued entry is void (but still there, so it needs to be checked). */ -void SubscriptionStore::queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session, bool forceNow) +void SubscriptionStore::queueWillMessage(const std::shared_ptr &willMessage, const std::string &senderClientId, const std::shared_ptr &session, bool forceNow) { if (!willMessage) return; @@ -438,11 +439,6 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr &wil { if (settings->willsEnabled && auth.aclCheck(*willMessage, willMessage->payload) == AuthResult::success) { - std::string senderClientId; - - if (session) - senderClientId = session->getClientId(); - PublishCopyFactory factory(willMessage.get()); queuePacketAtSubscribers(factory, senderClientId); @@ -451,7 +447,9 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr &wil } // Avoid sending two immediate wills when a session is destroyed with the client disconnect. - if (session) // session is null when you're destroying a client before a session is assigned. + // Session is null when you're destroying a client before a session is assigned, or + // when an old client has no session anymore after a client with the same ID connects. + if (session) session->clearWill(); return; @@ -887,7 +885,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr &session) std::shared_ptr &will = session->getWill(); if (will) { - queueWillMessage(will, session, true); + queueWillMessage(will, clientid, session, true); } std::list> sessionsToRemove; @@ -1277,7 +1275,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath { sessionsById[session->getClientId()] = session; queueSessionRemoval(session); - queueWillMessage(session->getWill(), session); + queueWillMessage(session->getWill(), session->getClientId(), session); } for (auto &pair : loadedData.subscriptions) diff --git a/subscriptionstore.h b/subscriptionstore.h index dc3bdccf..ea28a415 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -144,7 +144,7 @@ class SubscriptionStore std::shared_ptr lockSession(const std::string &clientid); void sendQueuedWillMessages(); - void queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session, bool forceNow = false); + void queueWillMessage(const std::shared_ptr &willMessage, const std::string &senderClientId, const std::shared_ptr &session, bool forceNow = false); void queuePacketAtSubscribers(PublishCopyFactory ©Factory, const std::string &senderClientId, bool dollar = false); void giveClientRetainedMessages(const std::shared_ptr &ses, const std::vector &subscribeSubtopics, uint8_t max_qos); diff --git a/threaddata.cpp b/threaddata.cpp index f2aa849b..39c33906 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -393,7 +393,7 @@ void ThreadData::clientDisconnectActions(bool authenticated, const std::string & if (willPublish) { - store->queueWillMessage(willPublish, session); + store->queueWillMessage(willPublish, clientid, session); } if (session && session->getDestroyOnDisconnect())