diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index ce206fe0..c9e11df3 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -1449,6 +1449,67 @@ void MainTests::testBasicsWithFlashMQTestClient() } +void MainTests::testDontRemoveSessionGivenToNewClientWithSameId() +{ + FlashMQTestClient receiver; + receiver.start(); + receiver.connectClient(ProtocolVersion::Mqtt311, true, 60, [] (Connect &c) { + c.clientid = "Sandra-nonrandom"; + }); + receiver.subscribe("just/a/path", 0); + + FlashMQTestClient sender; + sender.start(); + sender.connectClient(ProtocolVersion::Mqtt5); + + { + Publish pub("just/a/path", "AAAAA", 0); + pub.constructPropertyBuilder(); + pub.propertyBuilder->writeTopicAlias(1); + sender.publish(pub); + } + + { + receiver.waitForMessageCount(1); + + const MqttPacket &pack1 = receiver.receivedPublishes.at(0); + + QCOMPARE(pack1.getTopic(), "just/a/path"); + QCOMPARE(pack1.getPayloadCopy(), "AAAAA"); + } + + FlashMQTestClient receiver2; + receiver2.start(); + receiver2.connectClient(ProtocolVersion::Mqtt311, true, 60, [] (Connect &c) { + c.clientid = "Sandra-nonrandom"; + }); + receiver2.subscribe("just/a/path", 0); + + { + Publish pub("just/a/path", "AAAAA", 0); + pub.constructPropertyBuilder(); + pub.propertyBuilder->writeTopicAlias(1); + sender.publish(pub); + } + + { + try + { + receiver2.waitForMessageCount(1); + } + catch(std::exception &ex) + { + QFAIL("The second subscriber did not get the message, so the subscription failed."); + } + + const MqttPacket &pack1 = receiver2.receivedPublishes.at(0); + + QCOMPARE(pack1.getTopic(), "just/a/path"); + QCOMPARE(pack1.getPayloadCopy(), "AAAAA"); + } + +} + void MainTests::testIncomingTopicAlias() { FlashMQTestClient receiver; diff --git a/FlashMQTests/tst_maintests.h b/FlashMQTests/tst_maintests.h index 54658430..97d73667 100644 --- a/FlashMQTests/tst_maintests.h +++ b/FlashMQTests/tst_maintests.h @@ -126,6 +126,8 @@ private slots: void testBasicsWithFlashMQTestClient(); + void testDontRemoveSessionGivenToNewClientWithSameId(); + /** * Will tests. */ diff --git a/client.cpp b/client.cpp index f96e41ca..a15c2202 100644 --- a/client.cpp +++ b/client.cpp @@ -545,7 +545,7 @@ void Client::sendOrQueueWill() return; std::shared_ptr store = MainApp::getMainApp()->getSubscriptionStore(); - store->queueWillMessage(willPublish, session); + store->queueWillMessage(willPublish, clientid, session); this->willPublish.reset(); } @@ -905,6 +905,11 @@ std::shared_ptr Client::getSession() return this->session; } +void Client::resetSession() +{ + this->session.reset(); +} + void Client::setDisconnectReason(const std::string &reason) { if (!this->disconnectReason.empty()) diff --git a/client.h b/client.h index 204843aa..87e58ae5 100644 --- a/client.h +++ b/client.h @@ -153,6 +153,7 @@ class Client const std::shared_ptr &getStagedWill() { return this->stagedWillPublish; } void assignSession(std::shared_ptr &session); std::shared_ptr getSession(); + void resetSession(); void setDisconnectReason(const std::string &reason); std::chrono::seconds getSecondsTillKeepAliveAction() const; diff --git a/session.cpp b/session.cpp index 765d3b64..f071efc2 100644 --- a/session.cpp +++ b/session.cpp @@ -87,7 +87,7 @@ uint16_t Session::getNextPacketId() Session::~Session() { - + logger->log(LOG_DEBUG) << "Session destructor of session with client ID '" << this->client_id << "'."; } /** diff --git a/subscriptionstore.cpp b/subscriptionstore.cpp index 85403313..654f91a9 100644 --- a/subscriptionstore.cpp +++ b/subscriptionstore.cpp @@ -315,6 +315,7 @@ void SubscriptionStore::registerClientAndKickExistingOne(std::shared_ptr if (clientOfOtherSession) { logger->logf(LOG_NOTICE, "Disconnecting existing client with id '%s'", clientOfOtherSession->getClientId().c_str()); + clientOfOtherSession->resetSession(); clientOfOtherSession->setDisconnectReason("Another client with this ID connected"); clientOfOtherSession->serverInitiatedDisconnect(ReasonCodes::SessionTakenOver); } @@ -423,7 +424,7 @@ void SubscriptionStore::sendQueuedWillMessages() * The queued will is only valid for that time. Should a new will be placed in the map for a session, the original shared_ptr * will be cleared and the previously queued entry is void (but still there, so it needs to be checked). */ -void SubscriptionStore::queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session, bool forceNow) +void SubscriptionStore::queueWillMessage(const std::shared_ptr &willMessage, const std::string &senderClientId, const std::shared_ptr &session, bool forceNow) { if (!willMessage) return; @@ -438,11 +439,6 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr &wil { if (settings->willsEnabled && auth.aclCheck(*willMessage, willMessage->payload) == AuthResult::success) { - std::string senderClientId; - - if (session) - senderClientId = session->getClientId(); - PublishCopyFactory factory(willMessage.get()); queuePacketAtSubscribers(factory, senderClientId); @@ -451,7 +447,9 @@ void SubscriptionStore::queueWillMessage(const std::shared_ptr &wil } // Avoid sending two immediate wills when a session is destroyed with the client disconnect. - if (session) // session is null when you're destroying a client before a session is assigned. + // Session is null when you're destroying a client before a session is assigned, or + // when an old client has no session anymore after a client with the same ID connects. + if (session) session->clearWill(); return; @@ -887,7 +885,7 @@ void SubscriptionStore::removeSession(const std::shared_ptr &session) std::shared_ptr &will = session->getWill(); if (will) { - queueWillMessage(will, session, true); + queueWillMessage(will, clientid, session, true); } std::list> sessionsToRemove; @@ -1277,7 +1275,7 @@ void SubscriptionStore::loadSessionsAndSubscriptions(const std::string &filePath { sessionsById[session->getClientId()] = session; queueSessionRemoval(session); - queueWillMessage(session->getWill(), session); + queueWillMessage(session->getWill(), session->getClientId(), session); } for (auto &pair : loadedData.subscriptions) diff --git a/subscriptionstore.h b/subscriptionstore.h index dc3bdccf..ea28a415 100644 --- a/subscriptionstore.h +++ b/subscriptionstore.h @@ -144,7 +144,7 @@ class SubscriptionStore std::shared_ptr lockSession(const std::string &clientid); void sendQueuedWillMessages(); - void queueWillMessage(const std::shared_ptr &willMessage, const std::shared_ptr &session, bool forceNow = false); + void queueWillMessage(const std::shared_ptr &willMessage, const std::string &senderClientId, const std::shared_ptr &session, bool forceNow = false); void queuePacketAtSubscribers(PublishCopyFactory ©Factory, const std::string &senderClientId, bool dollar = false); void giveClientRetainedMessages(const std::shared_ptr &ses, const std::vector &subscribeSubtopics, uint8_t max_qos); diff --git a/threaddata.cpp b/threaddata.cpp index f2aa849b..39c33906 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -393,7 +393,7 @@ void ThreadData::clientDisconnectActions(bool authenticated, const std::string & if (willPublish) { - store->queueWillMessage(willPublish, session); + store->queueWillMessage(willPublish, clientid, session); } if (session && session->getDestroyOnDisconnect())