diff --git a/3rd-party/3rd-party.iml b/3rd-party/3rd-party.iml
index d1d2d7891..8021953ed 100644
--- a/3rd-party/3rd-party.iml
+++ b/3rd-party/3rd-party.iml
@@ -5,16 +5,5 @@
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 4f602e1b8..37c851f7a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -22,6 +22,11 @@ dependencies = [
"memchr 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "bitflags"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
[[package]]
name = "byteorder"
version = "0.5.3"
@@ -36,6 +41,25 @@ dependencies = [
"time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "foreign-types"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "gcc"
+version = "0.3.45"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "gdi32-sys"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
[[package]]
name = "idna"
version = "0.1.0"
@@ -138,6 +162,7 @@ dependencies = [
"chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "openssl 0.9.9 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -161,6 +186,35 @@ dependencies = [
"timer 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "openssl"
+version = "0.9.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "foreign-types 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "openssl-sys 0.9.9 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "openssl-sys"
+version = "0.9.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "gcc 0.3.45 (registry+https://github.com/rust-lang/crates.io-index)",
+ "gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
+ "user32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "pkg-config"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
[[package]]
name = "quote"
version = "0.3.12"
@@ -310,6 +364,15 @@ dependencies = [
"matches 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "user32-sys"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
[[package]]
name = "utf8-ranges"
version = "1.0.0"
@@ -340,8 +403,12 @@ dependencies = [
[metadata]
"checksum aho-corasick 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4f660b942762979b56c9f07b4b36bb559776fbad102f05d6771e1b629e8fd5bf"
+"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
"checksum chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)" = "9213f7cd7c27e95c2b57c49f0e69b1ea65b27138da84a170133fd21b07659c00"
+"checksum foreign-types 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3e4056b9bd47f8ac5ba12be771f77a0dae796d1bbaaf5fd0b9c2d38b69b8a29d"
+"checksum gcc 0.3.45 (registry+https://github.com/rust-lang/crates.io-index)" = "40899336fb50db0c78710f53e87afc54d8c7266fb76262fecc78ca1a7f09deae"
+"checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518"
"checksum idna 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1053236e00ce4f668aeca4a769a09b3bf5a682d802abd6f3cb39374f6b162c11"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6abe0ee2e758cd6bc8a2cd56726359007748fbf4128da998b65d0b70f881e19b"
@@ -354,6 +421,9 @@ dependencies = [
"checksum num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "fb24d9bfb3f222010df27995441ded1e954f8f69cd35021f6bef02ca9552fb92"
"checksum num-iter 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "287a1c9969a847055e1122ec0ea7a5c5d6f72aad97934e131c83d5c08ab4e45c"
"checksum num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a16a42856a256b39c6d3484f097f6713e14feacd9bfb02290917904fae46c81c"
+"checksum openssl 0.9.9 (registry+https://github.com/rust-lang/crates.io-index)" = "d59714233ccf23bc962f5eddc5d5c551c5848400e5ab01c64dded1743f3e3784"
+"checksum openssl-sys 0.9.9 (registry+https://github.com/rust-lang/crates.io-index)" = "376c5c6084e5ea95eea9c3280801e46d0dcf51251d4f01b747e044fb64d1fb31"
+"checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903"
"checksum quote 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)" = "e7b44fd83db28b83c1c58187159934906e5e955c812e211df413b76b03c909a5"
"checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d"
"checksum redox_syscall 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "8dd35cc9a8bdec562c757e3d43c1526b5c6d2653e23e2315065bc25556550753"
@@ -373,6 +443,7 @@ dependencies = [
"checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
"checksum unreachable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1f2ae5ddb18e1c92664717616dd9549dde73f539f01bd7b77c2edb2446bdff91"
"checksum url 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f5ba8a749fb4479b043733416c244fa9d1d3af3d7c23804944651c8a448cb87e"
+"checksum user32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4ef4711d107b21b410a3a974b1204d9accc8b10dad75d8324b5d755de1617d47"
"checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index fa7768295..d188ce608 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -11,4 +11,13 @@ rand = "0.3"
regex = "0.2.1"
lazy_static = "0.2"
-# openssl = "0.9.2"
+# OpenSSL is optional due to problems building and installing it on some platforms.
+openssl = { version = "0.9.9", optional = true }
+
+[features]
+# The default package is no crypto.
+default = []
+
+# Crypto relies on openssl which is a bit of a pain to use on some platforms. Build without if you have trouble with it
+# or simply don't need the functionality.
+crypto = ["openssl"]
\ No newline at end of file
diff --git a/core/src/crypto/mod.rs b/core/src/crypto/mod.rs
new file mode 100644
index 000000000..c95c17f76
--- /dev/null
+++ b/core/src/crypto/mod.rs
@@ -0,0 +1,8 @@
+//! Contains the wrappers for crypto for OPC UA.
+//!
+//! This file is an optional component of the stack. If it isn't compiled in, then the OPC UA
+//! impl will not support encryption, decryption, signing or verification.
+
+// TODO
+
+use openssl::*;
\ No newline at end of file
diff --git a/docs/docs.iml b/docs/docs.iml
index d1d2d7891..8021953ed 100644
--- a/docs/docs.iml
+++ b/docs/docs.iml
@@ -5,16 +5,5 @@
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/server/src/comms/tcp_transport.rs b/server/src/comms/tcp_transport.rs
index 6e14ea6ae..6c9d1398b 100644
--- a/server/src/comms/tcp_transport.rs
+++ b/server/src/comms/tcp_transport.rs
@@ -26,7 +26,7 @@ const SEND_BUFFER_SIZE: usize = 1024 * 64;
const MAX_MESSAGE_SIZE: usize = 1024 * 64;
// Rate at which subscriptions are serviced
-const SUBSCRIPTION_TIMER_RATE: i64 = 5000;
+const SUBSCRIPTION_TIMER_RATE: i64 = 500;
#[derive(Clone, Debug, PartialEq)]
pub enum TransportState {
diff --git a/server/src/services/monitored_item.rs b/server/src/services/monitored_item.rs
index 25aa64368..00c3138d0 100644
--- a/server/src/services/monitored_item.rs
+++ b/server/src/services/monitored_item.rs
@@ -42,11 +42,55 @@ impl MonitoredItemService {
Ok(SupportedMessage::CreateMonitoredItemsResponse(response))
}
- pub fn modify_monitored_items(&self, _: &mut ServerState, _: &mut SessionState, _: ModifyMonitoredItemsRequest) -> Result {
- unimplemented!();
+ pub fn modify_monitored_items(&self, _: &mut ServerState, session_state: &mut SessionState, request: ModifyMonitoredItemsRequest) -> Result {
+ let mut service_status = &GOOD;
+ let results = if let Some(ref items_to_modify) = request.items_to_modify {
+ // Find subscription and add items to it
+ let mut subscriptions = session_state.subscriptions.lock().unwrap();
+ let subscription_id = request.subscription_id;
+ if let Some(mut subscription) = subscriptions.get_mut(&subscription_id) {
+ Some(subscription.modify_monitored_items(items_to_modify))
+ } else {
+ // No matching subscription
+ service_status = &BAD_SUBSCRIPTION_ID_INVALID;
+ None
+ }
+ } else {
+ // No items to modify so nothing to do
+ service_status = &BAD_NOTHING_TO_DO;
+ None
+ };
+ let response = ModifyMonitoredItemsResponse {
+ response_header: ResponseHeader::new_service_result(&DateTime::now(), &request.request_header, service_status),
+ results: results,
+ diagnostic_infos: None
+ };
+ Ok(SupportedMessage::ModifyMonitoredItemsResponse(response))
}
- pub fn delete_monitored_items(&self, _: &mut ServerState, _: &mut SessionState, _: DeleteMonitoredItemsRequest) -> Result {
- unimplemented!();
+ pub fn delete_monitored_items(&self, _: &mut ServerState, session_state: &mut SessionState, request: DeleteMonitoredItemsRequest) -> Result {
+ let mut service_status = &GOOD;
+ let results = if let Some(ref items_to_delete) = request.monitored_item_ids {
+ // Find subscription and add items to it
+ let mut subscriptions = session_state.subscriptions.lock().unwrap();
+ let subscription_id = request.subscription_id;
+ if let Some(mut subscription) = subscriptions.get_mut(&subscription_id) {
+ Some(subscription.delete_monitored_items(items_to_delete))
+ } else {
+ // No matching subscription
+ service_status = &BAD_SUBSCRIPTION_ID_INVALID;
+ None
+ }
+ } else {
+ // No items to modify so nothing to do
+ service_status = &BAD_NOTHING_TO_DO;
+ None
+ };
+ let response = DeleteMonitoredItemsResponse {
+ response_header: ResponseHeader::new_service_result(&DateTime::now(), &request.request_header, service_status),
+ results: results,
+ diagnostic_infos: None
+ };
+ Ok(SupportedMessage::DeleteMonitoredItemsResponse(response))
}
}
\ No newline at end of file
diff --git a/server/src/session.rs b/server/src/session.rs
index d2c028c88..b3476c3b3 100644
--- a/server/src/session.rs
+++ b/server/src/session.rs
@@ -62,7 +62,7 @@ impl SessionState {
if subscription.state == SubscriptionState::Closed {
dead_subscriptions.push(*subscription_id);
} else {
- let (notification_message, publish_request_action) = subscription.tick(address_space, &publish_request, &now);
+ let (notification_message, publish_request_action, subscription_ack_results) = subscription.tick(address_space, &publish_request, &now);
if let Some(notification_message) = notification_message {
let publish_response = PublishResponse {
response_header: ResponseHeader::new_notification_response(&DateTime::now(), &GOOD),
@@ -97,12 +97,7 @@ impl SessionState {
self.publish_request_queue.push(publish_request.unwrap());
}
-
- if result.is_empty() {
- None
- } else {
- Some(result)
- }
+ if result.is_empty() { None } else { Some(result) }
}
@@ -117,10 +112,12 @@ impl SessionState {
} else {
MAX_REQUEST_TIMEOUT
};
- let timeout = time::Duration::milliseconds(timeout);
+ let timeout_d = time::Duration::milliseconds(timeout);
// The request has timed out if the timestamp plus hint exceeds the input time
- if timestamp + timeout >= *now {
+ let expiration_time = timestamp + timeout_d;
+ if *now >= expiration_time {
+ debug!("Publish request {} has expired - timestamp = {:?}, expiration hint = {}, expiration time = {:?}, time now = {:?}, ", r.request_header.request_handle, timestamp, timeout, expiration_time, now);
let now = DateTime::from_chrono(now);
let response = PublishResponse {
response_header: ResponseHeader::new_service_result(&now, &r.request_header, &BAD_REQUEST_TIMEOUT),
diff --git a/server/src/subscriptions/subscription.rs b/server/src/subscriptions/subscription.rs
index 08fb2ece1..f5f985bf4 100644
--- a/server/src/subscriptions/subscription.rs
+++ b/server/src/subscriptions/subscription.rs
@@ -204,7 +204,7 @@ impl Subscription {
/// Checks the subscription and monitored items for state change, messages. If the tick does
/// nothing, the function returns None. Otherwise it returns one or more messages in an Vec.
- pub fn tick(&mut self, address_space: &AddressSpace, publish_request: &Option, now: &DateTimeUTC) -> (Option, PublishRequestAction) {
+ pub fn tick(&mut self, address_space: &AddressSpace, publish_request: &Option, now: &DateTimeUTC) -> (Option, PublishRequestAction, Option>) {
debug!("subscription tick {}", self.subscription_id);
// Test if the interval has elapsed.
@@ -226,15 +226,15 @@ impl Subscription {
// If items have changed or subscription interval elapsed then we may have notifications
// to send or state to update
let result = if items_changed || publishing_timer_expired {
- let (_, update_state_action, publish_request_action) = self.update_state(publish_request, publishing_timer_expired);
+ let (_, update_state_action, publish_request_action, subscription_ack_results) = self.update_state(publish_request, publishing_timer_expired);
let notifications = match update_state_action {
UpdateStateAction::None => None,
UpdateStateAction::ReturnKeepAlive => self.return_keep_alive(),
UpdateStateAction::ReturnNotifications => self.return_notifications(),
};
- (notifications, publish_request_action)
+ (notifications, publish_request_action, Some(subscription_ack_results))
} else {
- (None, PublishRequestAction::None)
+ (None, PublishRequestAction::None, None)
};
// Check if the subscription interval has been exceeded since last call
@@ -292,9 +292,12 @@ impl Subscription {
// * Update state action - none, return notifications, return keep alive
// * Publishing request action - nothing, dequeue
//
- pub fn update_state(&mut self, publish_request: &Option, publishing_timer_expired: bool) -> (u8, UpdateStateAction, PublishRequestAction) {
+ pub fn update_state(&mut self, publish_request: &Option, publishing_timer_expired: bool) -> (u8, UpdateStateAction, PublishRequestAction, Vec) {
let receive_publish_request = publish_request.is_some();
+ // Acknowledgements to requests
+ let mut subscription_ack_results: Vec = Vec::new();
+
// This is a state engine derived from OPC UA Part 4 Publish service and might look a
// little odd for that.
//
@@ -310,7 +313,7 @@ impl Subscription {
// if receive_create_subscription {
// self.state = Subscription::Creating;
// }
- return (1, UpdateStateAction::None, PublishRequestAction::None);
+ return (1, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
}
SubscriptionState::Creating => {
// State #2
@@ -320,92 +323,91 @@ impl Subscription {
// State #3
self.state = SubscriptionState::Normal;
self.message_sent = false;
- return (3, UpdateStateAction::None, PublishRequestAction::None);
+ return (3, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
}
SubscriptionState::Normal => {
if receive_publish_request && (!self.publishing_enabled || (self.publishing_enabled && !self.more_notifications)) {
// State #4
- let publish_request = publish_request.as_ref().unwrap();
- self.delete_acked_notification_msgs(publish_request);
- return (4, UpdateStateAction::None, PublishRequestAction::None);
+ self.delete_acked_notification_msgs(publish_request.as_ref().unwrap(), &mut subscription_ack_results);
+ return (4, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
} else if receive_publish_request && self.publishing_enabled && self.more_notifications {
// State #5
self.reset_lifetime_counter();
- self.delete_acked_notification_msgs(publish_request.as_ref().unwrap());
+ self.delete_acked_notification_msgs(publish_request.as_ref().unwrap(), &mut subscription_ack_results);
self.message_sent = true;
- return (5, UpdateStateAction::ReturnNotifications, PublishRequestAction::None);
+ return (5, UpdateStateAction::ReturnNotifications, PublishRequestAction::None, subscription_ack_results);
} else if publishing_timer_expired && self.publishing_req_queued && self.publishing_enabled && self.notifications_available {
// State #6
self.reset_lifetime_counter();
self.start_publishing_timer();
self.message_sent = true;
- return (6, UpdateStateAction::ReturnNotifications, PublishRequestAction::Dequeue);
+ return (6, UpdateStateAction::ReturnNotifications, PublishRequestAction::Dequeue, subscription_ack_results);
} else if publishing_timer_expired && self.publishing_req_queued && !self.message_sent && (!self.publishing_enabled || (self.publishing_enabled && !self.notifications_available)) {
// State #7
self.reset_lifetime_counter();
self.start_publishing_timer();
self.message_sent = true;
- return (7, UpdateStateAction::ReturnKeepAlive, PublishRequestAction::Dequeue);
+ return (7, UpdateStateAction::ReturnKeepAlive, PublishRequestAction::Dequeue, subscription_ack_results);
} else if publishing_timer_expired && !self.publishing_req_queued && (!self.message_sent || (self.publishing_enabled && self.notifications_available)) {
// State #8
self.start_publishing_timer();
self.state = SubscriptionState::Late;
- return (8, UpdateStateAction::None, PublishRequestAction::None);
+ return (8, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
} else if publishing_timer_expired && self.message_sent && (!self.publishing_enabled || (self.publishing_enabled && !self.notifications_available)) {
// State #9
self.start_publishing_timer();
self.reset_keep_alive_counter();
self.state = SubscriptionState::KeepAlive;
- return (9, UpdateStateAction::None, PublishRequestAction::None);
+ return (9, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
}
}
SubscriptionState::Late => {
if receive_publish_request && self.publishing_enabled && (self.notifications_available || self.more_notifications) {
// State #10
self.reset_lifetime_counter();
- self.delete_acked_notification_msgs(publish_request.as_ref().unwrap());
+ self.delete_acked_notification_msgs(publish_request.as_ref().unwrap(), &mut subscription_ack_results);
self.state = SubscriptionState::Normal;
self.message_sent = true;
- return (10, UpdateStateAction::ReturnNotifications, PublishRequestAction::None);
+ return (10, UpdateStateAction::ReturnNotifications, PublishRequestAction::None, subscription_ack_results);
} else if receive_publish_request && (!self.publishing_enabled || (self.publishing_enabled && !self.notifications_available && !self.more_notifications)) {
// State #11
self.reset_lifetime_counter();
- self.delete_acked_notification_msgs(publish_request.as_ref().unwrap());
+ self.delete_acked_notification_msgs(publish_request.as_ref().unwrap(), &mut subscription_ack_results);
self.state = SubscriptionState::KeepAlive;
self.message_sent = true;
- return (11, UpdateStateAction::ReturnKeepAlive, PublishRequestAction::None);
+ return (11, UpdateStateAction::ReturnKeepAlive, PublishRequestAction::None, subscription_ack_results);
} else if publishing_timer_expired {
// State #12
self.start_publishing_timer();
- return (12, UpdateStateAction::None, PublishRequestAction::None);
+ return (12, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
}
}
SubscriptionState::KeepAlive => {
if receive_publish_request {
// State #13
- self.delete_acked_notification_msgs(publish_request.as_ref().unwrap());
- return (13, UpdateStateAction::None, PublishRequestAction::None);
+ self.delete_acked_notification_msgs(publish_request.as_ref().unwrap(), &mut subscription_ack_results);
+ return (13, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
} else if publishing_timer_expired && self.publishing_enabled && self.notifications_available && self.publishing_req_queued {
// State #14
self.message_sent = true;
self.state = SubscriptionState::Normal;
- return (14, UpdateStateAction::ReturnNotifications, PublishRequestAction::Dequeue);
+ return (14, UpdateStateAction::ReturnNotifications, PublishRequestAction::Dequeue, subscription_ack_results);
} else if publishing_timer_expired && self.publishing_req_queued && self.keep_alive_counter == 1 &&
!self.publishing_enabled || (self.publishing_enabled && self.notifications_available) {
// State #15
self.start_publishing_timer();
self.reset_keep_alive_counter();
- return (15, UpdateStateAction::ReturnKeepAlive, PublishRequestAction::Dequeue);
+ return (15, UpdateStateAction::ReturnKeepAlive, PublishRequestAction::Dequeue, subscription_ack_results);
} else if publishing_timer_expired && self.keep_alive_counter > 1 && (!self.publishing_enabled || (self.publishing_enabled && !self.notifications_available)) {
// State #16
self.start_publishing_timer();
self.keep_alive_counter -= 1;
- return (16, UpdateStateAction::None, PublishRequestAction::None);
+ return (16, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
} else if publishing_timer_expired && !self.publishing_req_queued && (self.keep_alive_counter == 1 || (self.keep_alive_counter > 1 && self.publishing_enabled && self.notifications_available)) {
// State #17
self.start_publishing_timer();
self.state = SubscriptionState::Late;
- return (17, UpdateStateAction::None, PublishRequestAction::None);
+ return (17, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results);
}
}
}
@@ -429,7 +431,7 @@ impl Subscription {
}
}
- (0, UpdateStateAction::None, PublishRequestAction::None)
+ (0, UpdateStateAction::None, PublishRequestAction::None, subscription_ack_results)
}
/// Deletes the acknowledged notifications, returning a list of status code for each according
@@ -438,10 +440,8 @@ impl Subscription {
/// GOOD - deleted notification
/// BAD_SUBSCRIPTION_ID_INVALID - Subscription doesn't exist
/// BAD_SEQUENCE_NUMBER_UNKNOWN - Sequence number doesn't exist
- pub fn delete_acked_notification_msgs(&mut self, request: &PublishRequest) -> Option> {
- if request.subscription_acknowledgements.is_none() {
- None
- } else {
+ pub fn delete_acked_notification_msgs(&mut self, request: &PublishRequest, results: &mut Vec) {
+ if request.subscription_acknowledgements.is_some() {
let mut results = Vec::new();
let subscription_acknowledgements = request.subscription_acknowledgements.as_ref().unwrap();
for ack in subscription_acknowledgements {
@@ -458,7 +458,6 @@ impl Subscription {
};
results.push(result.clone());
}
- Some(results)
}
}
diff --git a/server/src/tests/mod.rs b/server/src/tests/mod.rs
index e95b5eb01..6832b9cf7 100644
--- a/server/src/tests/mod.rs
+++ b/server/src/tests/mod.rs
@@ -54,7 +54,7 @@ pub fn server_config_save() {
#[test]
pub fn expired_publish_requests() {
let now = chrono::UTC::now();
- let now_plus_30s = now + time::Duration::seconds(30i64);
+ let now_plus_30s = now + time::Duration::seconds(30);
// Create two publish requests timestamped now, one which expires in > 30s, one which expires
// in > 20s
@@ -63,7 +63,7 @@ pub fn expired_publish_requests() {
request_header: RequestHeader::new(&NodeId::null(), &now, 1000),
subscription_acknowledgements: None,
};
- pr1.request_header.timeout_hint = 30001;
+ pr1.request_header.timeout_hint = 31000;
let mut pr2 = PublishRequest {
request_header: RequestHeader::new(&NodeId::null(), &now, 2000),
diff --git a/server/src/tests/subscription.rs b/server/src/tests/subscription.rs
index a2b3a2dd9..1be44070d 100644
--- a/server/src/tests/subscription.rs
+++ b/server/src/tests/subscription.rs
@@ -30,7 +30,7 @@ fn update_state_3() {
// Test #3 - state changes from Creating -> Normal
let publishing_timer_expired = false;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 3);
assert_eq!(update_state_action, UpdateStateAction::None);
@@ -60,7 +60,7 @@ fn update_state_4() {
s.publishing_enabled = false;
let publishing_timer_expired = false;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 4);
assert_eq!(update_state_action, UpdateStateAction::None);
@@ -90,7 +90,7 @@ fn update_state_5() {
s.lifetime_counter = 1;
let publishing_timer_expired = false;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 5);
assert_eq!(update_state_action, UpdateStateAction::ReturnNotifications);
@@ -118,7 +118,7 @@ fn update_state_6() {
s.publishing_req_queued = true;
let publishing_timer_expired = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
// ensure 6
assert_eq!(handled_state, 6);
@@ -144,7 +144,7 @@ fn update_state_7() {
s.publishing_enabled = true;
let publishing_timer_expired = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 7);
assert_eq!(update_state_action, UpdateStateAction::ReturnKeepAlive);
@@ -169,7 +169,7 @@ fn update_state_8() {
s.message_sent = false;
let publishing_timer_expired = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 8);
assert_eq!(update_state_action, UpdateStateAction::None);
@@ -192,7 +192,7 @@ fn update_state_9() {
s.keep_alive_counter = 3;
let publishing_timer_expired = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 9);
assert_eq!(update_state_action, UpdateStateAction::None);
@@ -211,7 +211,7 @@ fn update_state_10() {
s.notifications_available = true;
let publishing_timer_expired = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 10);
assert_eq!(update_state_action, UpdateStateAction::ReturnNotifications);
@@ -230,7 +230,7 @@ fn update_state_11() {
s.notifications_available = false;
let publishing_timer_expired = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 11);
assert_eq!(update_state_action, UpdateStateAction::ReturnKeepAlive);
@@ -249,7 +249,7 @@ fn update_state_12() {
s.notifications_available = false;
let publishing_timer_expired = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 12);
assert_eq!(update_state_action, UpdateStateAction::None);
@@ -264,7 +264,7 @@ fn update_state_13() {
let publish_request = Some(make_publish_request());
let publishing_timer_expired = false;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 13);
assert_eq!(update_state_action, UpdateStateAction::None);
@@ -283,7 +283,7 @@ fn update_state_14() {
s.notifications_available = true;
s.publishing_req_queued = true;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 14);
assert_eq!(update_state_action, UpdateStateAction::ReturnNotifications);
@@ -302,7 +302,7 @@ fn update_state_15() {
s.keep_alive_counter = 1;
s.publishing_enabled = false;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 15);
assert_eq!(update_state_action, UpdateStateAction::ReturnKeepAlive);
@@ -321,7 +321,7 @@ fn update_state_16() {
s.keep_alive_counter = 5;
s.publishing_enabled = false;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 16);
assert_eq!(update_state_action, UpdateStateAction::None);
@@ -340,7 +340,7 @@ fn update_state_17() {
s.publishing_req_queued = false;
s.keep_alive_counter = 1;
- let (handled_state, update_state_action, publish_request_action) = s.update_state(&publish_request, publishing_timer_expired);
+ let (handled_state, update_state_action, publish_request_action, _) = s.update_state(&publish_request, publishing_timer_expired);
assert_eq!(handled_state, 17);
assert_eq!(update_state_action, UpdateStateAction::None);
diff --git a/tools/tools.iml b/tools/tools.iml
index d1d2d7891..8021953ed 100644
--- a/tools/tools.iml
+++ b/tools/tools.iml
@@ -5,16 +5,5 @@
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file