Skip to content

Commit

Permalink
Subscription identifier for offline queued packets
Browse files Browse the repository at this point in the history
  • Loading branch information
halfgaar committed Nov 9, 2024
1 parent 55d11c1 commit 55594af
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 4 deletions.
2 changes: 2 additions & 0 deletions mqttpacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,9 @@ void MqttPacket::parsePublishData()
if (sender->getClientType() != ClientType::LocalBridge)
throw ProtocolError("Subscription identifiers cannot be sent to servers.", ReasonCodes::ProtocolError);

// We don't store it, because it should not propagate.
decodeVariableByteIntAtPos();

break;
}
case Mqtt5Properties::ContentType:
Expand Down
3 changes: 2 additions & 1 deletion qospacketqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ void QoSPublishQueue::addToHeadOfLinkedList(std::shared_ptr<QueuedPublish> &qp)
* subscribe, which an offline client can't do. However, MQTT5 introduces 'retained as published', so it becomes valid. Bridge
* mode uses this as well.
*/
void QoSPublishQueue::queuePublish(PublishCopyFactory &copyFactory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished)
void QoSPublishQueue::queuePublish(PublishCopyFactory &copyFactory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished, const uint32_t subscriptionIdentifier)
{
assert(new_max_qos > 0);
assert(id > 0);

Publish pub = copyFactory.getNewPublish(new_max_qos, retainAsPublished);
pub.subscriptionIdentifier = subscriptionIdentifier;
std::shared_ptr<QueuedPublish> qp = std::make_shared<QueuedPublish>(std::move(pub), id);
addToHeadOfLinkedList(qp);
qosQueueBytes += qp->getApproximateMemoryFootprint();
Expand Down
2 changes: 1 addition & 1 deletion qospacketqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class QoSPublishQueue
bool erase(const uint16_t packet_id);
size_t size() const;
size_t getByteSize() const;
void queuePublish(PublishCopyFactory &copyFactory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished);
void queuePublish(PublishCopyFactory &copyFactory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished, const uint32_t subscriptionIdentifier);
void queuePublish(Publish &&pub, uint16_t id);
int clearExpiredMessages();
const std::shared_ptr<QueuedPublish> &getTail() const;
Expand Down
4 changes: 2 additions & 2 deletions session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ PacketDropReason Session::writePacket(PublishCopyFactory &copyFactory, const uin
pack_id = getNextPacketId();

if (!destroyOnDisconnect)
qosPacketQueue.queuePublish(copyFactory, pack_id, effectiveQos, effectiveRetain); // TODO: here subscription identifier
qosPacketQueue.queuePublish(copyFactory, pack_id, effectiveQos, effectiveRetain, subscriptionIdentifier);
}

PacketDropReason return_value = PacketDropReason::ClientOffline;
Expand Down Expand Up @@ -289,7 +289,7 @@ void Session::sendAllPendingQosData()
{
PublishCopyFactory fac(&p.first);
const bool retain = !c->isRetainedAvailable() ? false : p.first.retain;
c->writeMqttPacketAndBlameThisClient(fac, p.first.qos, p.second, retain, 0); // TODO: subscription identifiers
c->writeMqttPacketAndBlameThisClient(fac, p.first.qos, p.second, retain, p.first.subscriptionIdentifier);
}

for(uint16_t id : copiedQoS2Ids)
Expand Down

0 comments on commit 55594af

Please sign in to comment.