From 4cb7ab57ef2e27b02d33cb94d6f5e92fa05d8af6 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Wed, 3 Apr 2019 22:35:23 +0100 Subject: [PATCH 1/9] Changes to simplify subscriptions which is broken --- server/src/subscriptions/subscription.rs | 279 ++++++++++-------- server/src/subscriptions/subscriptions.rs | 64 ++-- server/src/tests/mod.rs | 41 ++- server/src/tests/services/monitored_item.rs | 10 +- server/src/tests/services/subscription.rs | 30 +- .../src/tests/subscriptions/subscription.rs | 4 +- 6 files changed, 240 insertions(+), 188 deletions(-) diff --git a/server/src/subscriptions/subscription.rs b/server/src/subscriptions/subscription.rs index 89611f684..c91a45fe7 100644 --- a/server/src/subscriptions/subscription.rs +++ b/server/src/subscriptions/subscription.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, BTreeSet}; +use std::collections::{HashMap, BTreeSet, VecDeque}; use std::sync::{Arc, RwLock}; use chrono; @@ -42,9 +42,14 @@ pub(crate) struct SubscriptionStateParams { #[derive(Debug, Copy, Clone, PartialEq)] pub enum UpdateStateAction { None, + // Return a keep alive ReturnKeepAlive, + // Return notifications ReturnNotifications, - CloseExpiredSubscription, + // The subscription was created normally + SubscriptionCreated, + // The subscription has expired and must be closed + SubscriptionExpired, } /// This is for debugging purposes. It allows the caller to validate the output state if required. @@ -134,6 +139,9 @@ pub struct Subscription { next_monitored_item_id: u32, // The time that the subscription interval last fired last_timer_expired_time: DateTimeUtc, + // Currently outstanding notifications to send + #[serde(skip)] + notifications: VecDeque, /// Server diagnostics to track creation / destruction / modification of the subscription #[serde(skip)] diagnostics: Arc>, @@ -171,6 +179,7 @@ impl Subscription { sequence_number: Handle::new(1), next_monitored_item_id: 1, last_timer_expired_time: chrono::Utc::now(), + notifications: VecDeque::with_capacity(100), diagnostics, diagnostics_on_drop: true, }; @@ -181,6 +190,10 @@ impl Subscription { subscription } + pub(crate) fn ready_to_remove(&self) -> bool { + self.state == SubscriptionState::Closed && self.notifications.is_empty() + } + /// Creates monitored items on the specified subscription, returning the creation results pub fn create_monitored_items(&mut self, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest]) -> Vec { self.reset_lifetime_counter(); @@ -286,9 +299,9 @@ impl Subscription { self.resend_data = true; } - /// Checks the subscription and monitored items for state change, messages. If the tick does - /// nothing, the function returns None. Otherwise it returns one or more messages in an Vec. - pub(crate) fn tick(&mut self, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool, now: &DateTimeUtc) -> Option { + /// Checks the subscription and monitored items for state change, messages. Returns `true` + /// if there are zero or more notifications waiting to be processed. + pub(crate) fn tick(&mut self, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool, now: &DateTimeUtc) { // Check if the publishing interval has elapsed. Only checks on the tick timer. let publishing_interval_elapsed = match tick_reason { TickReason::ReceivedPublishRequest => false, @@ -312,7 +325,8 @@ impl Subscription { // Do a tick on monitored items. Note that monitored items normally update when the interval // elapses but they don't have to. So this is called every tick just to catch items with their // own intervals. - let (notification_message, more_notifications) = match self.state { + + let (notification, more_notifications) = match self.state { SubscriptionState::Closed | SubscriptionState::Creating => (None, false), _ => { let resend_data = self.resend_data; @@ -320,11 +334,12 @@ impl Subscription { } }; self.resend_data = false; - let notifications_available = notification_message.is_some(); + + let notifications_available = notification.is_some(); // If items have changed or subscription interval elapsed then we may have notifications // to send or state to update - let result = if notifications_available || publishing_interval_elapsed || publishing_req_queued { + if notifications_available || publishing_interval_elapsed || publishing_req_queued { // Update the internal state of the subscription based on what happened let update_state_result = self.update_state(tick_reason, SubscriptionStateParams { publishing_req_queued, @@ -337,149 +352,48 @@ impl Subscription { // Now act on the state's action match update_state_result.update_state_action { UpdateStateAction::None => { - if notifications_available { + if let Some(ref notification) = notification { // Reset the next sequence number to the discarded notification - let notification_sequence_number = notification_message.unwrap().sequence_number; + let notification_sequence_number = notification.sequence_number; self.sequence_number.set_next(notification_sequence_number); debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result); } // Send nothing //println!("do nothing {:?}", update_state_result.handled_state); - None } UpdateStateAction::ReturnKeepAlive => { - if notifications_available { + if let Some(ref notification) = notification { // Reset the next sequence number to the discarded notification - let notification_sequence_number = notification_message.unwrap().sequence_number; + let notification_sequence_number = notification.sequence_number; self.sequence_number.set_next(notification_sequence_number); debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result); } // Send a keep alive debug!("Sending keep alive response"); - Some(NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()))) + self.notifications.push_back(NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()))); } UpdateStateAction::ReturnNotifications => { - // Send the notification message - debug!("Sending notification response"); - notification_message - } - UpdateStateAction::CloseExpiredSubscription => { - // Delete the monitored items, issue a status change for the subscription - self.monitored_items.clear(); - debug!("Deleting monitored items and issuing subscription status change"); - Some(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout)) - } - } - } else { - None - }; - - result - } - - /// Iterate through the monitored items belonging to the subscription, calling tick on each in turn. - /// - /// Items that are in a reporting state, or triggered to report will be have their pending notifications - /// collected together when the publish interval elapsed flag is `true`. - /// - /// The function returns a `notifications` and a `more_notifications` boolean to indicate if the notifications - /// are available. - fn tick_monitored_items(&mut self, address_space: &AddressSpace, now: &DateTimeUtc, publishing_interval_elapsed: bool, resend_data: bool) -> (Option, bool) { - let mut triggered_items: BTreeSet = BTreeSet::new(); - let mut notification_messages = Vec::new(); - - for (_, monitored_item) in &mut self.monitored_items { - // If this returns true then the monitored item wants to report its notification - let monitoring_mode = monitored_item.monitoring_mode(); - match monitored_item.tick(address_space, now, publishing_interval_elapsed, resend_data) { - TickResult::ReportValueChanged => { - // If this monitored item has triggered items, then they need to be handled - match monitoring_mode { - MonitoringMode::Reporting => { - // From triggering docs - // If the monitoring mode of the triggering item is REPORTING, then it is reported when the - // triggering item triggers the items to report. - monitored_item.triggered_items().iter().for_each(|i| { - triggered_items.insert(*i); - }) - } - _ => { - // Sampling should have gone in the other branch. Disabled shouldn't do anything. - panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode); - } - } - if publishing_interval_elapsed { - // Take some / all of the monitored item's pending notifications - if let Some(mut item_notification_messages) = monitored_item.all_notification_messages() { - notification_messages.append(&mut item_notification_messages); - } + // Add the notification message to the queue + if let Some(notification) = notification { + self.notifications.push_back(notification); } } - TickResult::ValueChanged => { - // The monitored item doesn't have changes to report but its value did change so it - // is still necessary to check its triggered items. - match monitoring_mode { - MonitoringMode::Sampling => { - // If the monitoring mode of the triggering item is SAMPLING, then it is not reported when the - // triggering item triggers the items to report. - monitored_item.triggered_items().iter().for_each(|i| { - triggered_items.insert(*i); - }) - } - _ => { - // Reporting should have gone in the other branch. Disabled shouldn't do anything. - panic!("How can there be a value change when the mode is not sampling?"); - } - } + UpdateStateAction::SubscriptionCreated => { + // Subscription was created successfully + self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good)) } - TickResult::NoChange => { - // Ignore + UpdateStateAction::SubscriptionExpired => { + // Delete the monitored items, issue a status change for the subscription + debug!("Subscription status change to closed / timeout"); + self.monitored_items.clear(); + self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout)) } } } + } - // Are there any triggered items to force a change on? - triggered_items.iter().for_each(|i| { - if let Some(ref mut monitored_item) = self.monitored_items.get_mut(i) { - // Check the monitoring mode of the item to report - match monitored_item.monitoring_mode() { - MonitoringMode::Sampling => { - // If the monitoring mode of the item to report is SAMPLING, then it is reported when the - // triggering item triggers the i tems to report. - // - // Call with the resend_data flag as true to force the monitored item to - monitored_item.check_value(address_space, now, true); - if let Some(mut item_notification_messages) = monitored_item.all_notification_messages() { - notification_messages.append(&mut item_notification_messages); - } - } - MonitoringMode::Reporting => { - // If the monitoring mode of the item to report is REPORTING, this effectively causes the - // triggering item to be ignored. All notifications of the items to report are sent after the - // publishing interval expires. - // - // DO NOTHING - } - MonitoringMode::Disabled => { - // DO NOTHING - } - } - } else { - // It is possible that a monitored item contains a triggered id which has been deleted, so silently - // ignore that case. - } - }); - - if !notification_messages.is_empty() { - let next_sequence_number = self.sequence_number.next(); - debug!("Create notification for subscription {}, sequence number {}", self.subscription_id, next_sequence_number); - // Create a notification message and push it onto the queue - let notification = NotificationMessage::data_change(next_sequence_number, DateTime::now(), notification_messages); - // Advance next sequence number - (Some(notification), false) - } else { - (None, false) - } + pub(crate) fn take_notification(&mut self) -> Option { + self.notifications.pop_front() } // See OPC UA Part 4 5.13.1.2 State Table @@ -544,7 +458,7 @@ impl Subscription { if self.lifetime_counter == 1 { // State #27 self.state = SubscriptionState::Closed; - return UpdateStateResult::new(HandledState::Closed27, UpdateStateAction::CloseExpiredSubscription); + return UpdateStateResult::new(HandledState::Closed27, UpdateStateAction::SubscriptionExpired); } } _ => { @@ -560,7 +474,7 @@ impl Subscription { // State #3 self.state = SubscriptionState::Normal; self.message_sent = false; - return UpdateStateResult::new(HandledState::Create3, UpdateStateAction::None); + return UpdateStateResult::new(HandledState::Create3, UpdateStateAction::SubscriptionCreated); } SubscriptionState::Normal => { if tick_reason == TickReason::ReceivedPublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.more_notifications)) { @@ -650,6 +564,111 @@ impl Subscription { UpdateStateResult::new(HandledState::None0, UpdateStateAction::None) } + /// Iterate through the monitored items belonging to the subscription, calling tick on each in turn. + /// + /// Items that are in a reporting state, or triggered to report will be have their pending notifications + /// collected together when the publish interval elapsed flag is `true`. + /// + /// The function returns a `notifications` and a `more_notifications` boolean to indicate if the notifications + /// are available. + fn tick_monitored_items(&mut self, address_space: &AddressSpace, now: &DateTimeUtc, publishing_interval_elapsed: bool, resend_data: bool) -> (Option, bool) { + let mut triggered_items: BTreeSet = BTreeSet::new(); + let mut notification_messages = Vec::new(); + + for (_, monitored_item) in &mut self.monitored_items { + // If this returns true then the monitored item wants to report its notification + let monitoring_mode = monitored_item.monitoring_mode(); + match monitored_item.tick(address_space, now, publishing_interval_elapsed, resend_data) { + TickResult::ReportValueChanged => { + // If this monitored item has triggered items, then they need to be handled + match monitoring_mode { + MonitoringMode::Reporting => { + // From triggering docs + // If the monitoring mode of the triggering item is REPORTING, then it is reported when the + // triggering item triggers the items to report. + monitored_item.triggered_items().iter().for_each(|i| { + triggered_items.insert(*i); + }) + } + _ => { + // Sampling should have gone in the other branch. Disabled shouldn't do anything. + panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode); + } + } + if publishing_interval_elapsed { + // Take some / all of the monitored item's pending notifications + if let Some(mut item_notification_messages) = monitored_item.all_notification_messages() { + notification_messages.append(&mut item_notification_messages); + } + } + } + TickResult::ValueChanged => { + // The monitored item doesn't have changes to report but its value did change so it + // is still necessary to check its triggered items. + match monitoring_mode { + MonitoringMode::Sampling => { + // If the monitoring mode of the triggering item is SAMPLING, then it is not reported when the + // triggering item triggers the items to report. + monitored_item.triggered_items().iter().for_each(|i| { + triggered_items.insert(*i); + }) + } + _ => { + // Reporting should have gone in the other branch. Disabled shouldn't do anything. + panic!("How can there be a value change when the mode is not sampling?"); + } + } + } + TickResult::NoChange => { + // Ignore + } + } + } + + // Are there any triggered items to force a change on? + triggered_items.iter().for_each(|i| { + if let Some(ref mut monitored_item) = self.monitored_items.get_mut(i) { + // Check the monitoring mode of the item to report + match monitored_item.monitoring_mode() { + MonitoringMode::Sampling => { + // If the monitoring mode of the item to report is SAMPLING, then it is reported when the + // triggering item triggers the i tems to report. + // + // Call with the resend_data flag as true to force the monitored item to + monitored_item.check_value(address_space, now, true); + if let Some(mut item_notification_messages) = monitored_item.all_notification_messages() { + notification_messages.append(&mut item_notification_messages); + } + } + MonitoringMode::Reporting => { + // If the monitoring mode of the item to report is REPORTING, this effectively causes the + // triggering item to be ignored. All notifications of the items to report are sent after the + // publishing interval expires. + // + // DO NOTHING + } + MonitoringMode::Disabled => { + // DO NOTHING + } + } + } else { + // It is possible that a monitored item contains a triggered id which has been deleted, so silently + // ignore that case. + } + }); + + if !notification_messages.is_empty() { + let next_sequence_number = self.sequence_number.next(); + debug!("Create notification for subscription {}, sequence number {}", self.subscription_id, next_sequence_number); + // Create a notification message and push it onto the queue + let notification = NotificationMessage::data_change(next_sequence_number, DateTime::now(), notification_messages); + // Advance next sequence number + (Some(notification), false) + } else { + (None, false) + } + } + /// Reset the keep-alive counter to the maximum keep-alive count of the Subscription. /// The maximum keep-alive count is set by the Client when the Subscription is created /// and may be modified using the ModifySubscription Service diff --git a/server/src/subscriptions/subscriptions.rs b/server/src/subscriptions/subscriptions.rs index 9e412be8a..810bcd688 100644 --- a/server/src/subscriptions/subscriptions.rs +++ b/server/src/subscriptions/subscriptions.rs @@ -13,7 +13,7 @@ use crate::{ DateTimeUtc, subscriptions::{ PublishRequestEntry, PublishResponseEntry, - subscription::{Subscription, SubscriptionState, TickReason}, + subscription::{Subscription, TickReason}, }, }; @@ -28,9 +28,9 @@ use crate::{ /// client, or purged. pub(crate) struct Subscriptions { /// The publish request queue (requests by the client on the session) - pub(crate) publish_request_queue: VecDeque, + publish_request_queue: VecDeque, /// The publish response queue arranged oldest to latest - pub(crate) publish_response_queue: VecDeque, + publish_response_queue: VecDeque, // Timeout period for requests in ms publish_request_timeout: i64, /// Subscriptions associated with the session @@ -56,7 +56,17 @@ impl Subscriptions { } #[cfg(test)] - pub fn retransmission_queue(&mut self) -> &mut BTreeMap<(u32, u32), NotificationMessage> { + pub(crate) fn publish_request_queue(&mut self) -> &mut VecDeque { + &mut self.publish_request_queue + } + + #[cfg(test)] + pub(crate) fn publish_response_queue(&mut self) -> &mut VecDeque { + &mut self.publish_response_queue + } + + #[cfg(test)] + pub(crate) fn retransmission_queue(&mut self) -> &mut BTreeMap<(u32, u32), NotificationMessage> { &mut self.retransmission_queue } @@ -96,6 +106,9 @@ impl Subscriptions { } Err(StatusCode::BadTooManyPublishRequests) } else { + + + // Add to the front of the queue - older items are popped from the back self.publish_request_queue.push_front(PublishRequestEntry { request_id, @@ -158,27 +171,34 @@ impl Subscriptions { let subscription = self.subscriptions.get(&subscription_id).unwrap(); subscription.state() }; - if subscription_state == SubscriptionState::Closed { - // Subscription is dead so remove it - self.subscriptions.remove(&subscription_id); - } else { - let notification_message = { - let publishing_req_queued = !self.publish_request_queue.is_empty(); - let subscription = self.subscriptions.get_mut(&subscription_id).unwrap(); - // Now tick the subscription to see if it has any notifications. If there are - // notifications then the publish response will be associated with his subscription - // and ready to go. - subscription.tick(address_space, tick_reason, publishing_req_queued, now) - }; - if let Some(notification_message) = notification_message { - if self.publish_request_queue.is_empty() { - panic!("Should not be returning a notification message if there are no publish request to fill"); + + let publishing_req_queued = !self.publish_request_queue.is_empty(); + let subscription = self.subscriptions.get_mut(&subscription_id).unwrap(); + + // Now tick the subscription to see if it has any notifications. If there are + // notifications then the publish response will be associated with his subscription + // and ready to go. + subscription.tick(address_space, tick_reason, publishing_req_queued, now); + + // Process any notifications + loop { + if !self.publish_request_queue.is_empty() { + if let Some(notification_message) = subscription.take_notification() { + let publish_request = self.publish_request_queue.pop_back().unwrap(); + // Consume the publish request and queue the notification onto the transmission queue + self.transmission_queue.push_front((subscription_id, publish_request, notification_message)); + } else { + break; } - // Consume the publish request and queue the notification onto the transmission queue - let publish_request = self.publish_request_queue.pop_back().unwrap(); - self.transmission_queue.push_front((subscription_id, publish_request, notification_message)); + } else { + break; } } + + // Remove the subscription if it is done + if subscription.ready_to_remove() { + self.subscriptions.remove(&subscription_id); + } } // Iterate through notifications from oldest to latest in the transmission making publish diff --git a/server/src/tests/mod.rs b/server/src/tests/mod.rs index f1ea3505b..f91dcecb6 100644 --- a/server/src/tests/mod.rs +++ b/server/src/tests/mod.rs @@ -1,5 +1,4 @@ use std; -use std::collections::VecDeque; use std::path::PathBuf; use chrono; @@ -126,8 +125,10 @@ pub fn expired_publish_requests() { // Create session with publish requests let secure_channel: SecureChannel = (SecurityPolicy::None, MessageSecurityMode::None).into(); let mut session = Session::new_no_certificate_store(secure_channel); - session.subscriptions.publish_request_queue = { - let mut publish_request_queue = VecDeque::with_capacity(2); + + { + let publish_request_queue = session.subscriptions.publish_request_queue(); + publish_request_queue.clear(); publish_request_queue.push_back(pr1); publish_request_queue.push_back(pr2); publish_request_queue @@ -135,18 +136,30 @@ pub fn expired_publish_requests() { // Expire requests, see which expire session.expire_stale_publish_requests(&now_plus_5s); - let expired_responses = &session.subscriptions.publish_response_queue; + // The > 30s timeout hint request should be expired and the other should remain - assert_eq!(expired_responses.len(), 1); - assert_eq!(session.subscriptions.publish_request_queue.len(), 1); - assert_eq!(session.subscriptions.publish_request_queue[0].request.request_header.request_handle, 1000); - - let r1 = &expired_responses[0]; - if let SupportedMessage::ServiceFault(ref response_header) = r1.response { - assert_eq!(response_header.response_header.request_handle, 2000); - assert_eq!(response_header.response_header.service_result, StatusCode::BadTimeout); - } else { - panic!("Expected service faults for timed out publish requests") + + // Remain + { + let publish_request_queue = session.subscriptions.publish_request_queue(); + assert_eq!(publish_request_queue.len(), 1); + assert_eq!(publish_request_queue[0].request.request_header.request_handle, 1000); } + + // Expire + { + let publish_response_queue = session.subscriptions.publish_response_queue(); + assert_eq!(publish_response_queue.len(), 1); + + let r1 = &publish_response_queue[0]; + if let SupportedMessage::ServiceFault(ref response_header) = r1.response { + assert_eq!(response_header.response_header.request_handle, 2000); + assert_eq!(response_header.response_header.service_result, StatusCode::BadTimeout); + } else { + panic!("Expected service faults for timed out publish requests") + } + } + + } \ No newline at end of file diff --git a/server/src/tests/services/monitored_item.rs b/server/src/tests/services/monitored_item.rs index 166efdcfb..70170510b 100644 --- a/server/src/tests/services/monitored_item.rs +++ b/server/src/tests/services/monitored_item.rs @@ -87,14 +87,14 @@ fn publish_request(session: &mut Session, ss: &SubscriptionService) { subscription_acknowledgements: None, }; - session.subscriptions.publish_request_queue.clear(); + session.subscriptions.publish_request_queue().clear(); let response = ss.async_publish(session, request_id, &request).unwrap(); assert!(response.is_none()); - assert!(!session.subscriptions.publish_request_queue.is_empty()); + assert!(!session.subscriptions.publish_request_queue().is_empty()); } fn publish_response(session: &mut Session) -> PublishResponse { - let response = session.subscriptions.publish_response_queue.pop_back().unwrap().response; + let response = session.subscriptions.publish_response_queue().pop_back().unwrap().response; let response: PublishResponse = supported_message_as!(response, PublishResponse); response } @@ -103,7 +103,7 @@ fn publish_tick_no_response(session: &mut Session, ss: &SubscriptionService, add publish_request(session, ss); let now = now.add(duration); let _ = session.tick_subscriptions(&now, address_space, TickReason::TickTimerFired); - assert_eq!(session.subscriptions.publish_response_queue.len(), 0); + assert_eq!(session.subscriptions.publish_response_queue().len(), 0); now } @@ -115,7 +115,7 @@ fn publish_tick_response(session: &mut Session, ss: &SubscriptionService, add publish_request(session, ss); let now = now.add(duration); let _ = session.tick_subscriptions(&now, address_space, TickReason::TickTimerFired); - assert_eq!(session.subscriptions.publish_response_queue.len(), 1); + assert_eq!(session.subscriptions.publish_response_queue().len(), 1); let response = publish_response(session); handler(response); now diff --git a/server/src/tests/services/subscription.rs b/server/src/tests/services/subscription.rs index d17227499..a88c66f7b 100644 --- a/server/src/tests/services/subscription.rs +++ b/server/src/tests/services/subscription.rs @@ -110,18 +110,18 @@ fn publish_response_subscription() { let response = ss.async_publish(session, request_id, &request).unwrap(); assert!(response.is_none()); - assert!(!session.subscriptions.publish_request_queue.is_empty()); + assert!(!session.subscriptions.publish_request_queue().is_empty()); // Tick subscriptions to trigger a change let now = Utc::now().add(chrono::Duration::seconds(2)); let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response - assert_eq!(session.subscriptions.publish_request_queue.len(), 0); - assert_eq!(session.subscriptions.publish_response_queue.len(), 1); + assert_eq!(session.subscriptions.publish_request_queue().len(), 0); + assert_eq!(session.subscriptions.publish_response_queue().len(), 1); // Get the response from the queue - let response = session.subscriptions.publish_response_queue.pop_back().unwrap().response; + let response = session.subscriptions.publish_response_queue().pop_back().unwrap().response; let response: PublishResponse = supported_message_as!(response, PublishResponse); debug!("PublishResponse {:#?}", response); @@ -154,7 +154,7 @@ fn publish_response_subscription() { assert_eq!(monitored_item_notification.client_handle, 0); // We expect the queue to be empty, because we got an immediate response - assert!(session.subscriptions.publish_response_queue.is_empty()); + assert!(session.subscriptions.publish_response_queue().is_empty()); }) } @@ -185,11 +185,11 @@ fn resend_data() { let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response - assert_eq!(session.subscriptions.publish_request_queue.len(), 0); - assert_eq!(session.subscriptions.publish_response_queue.len(), 1); + assert_eq!(session.subscriptions.publish_request_queue().len(), 0); + assert_eq!(session.subscriptions.publish_response_queue().len(), 1); // Get the response from the queue - let response = session.subscriptions.publish_response_queue.pop_back().unwrap().response; + let response = session.subscriptions.publish_response_queue().pop_back().unwrap().response; let response: PublishResponse = supported_message_as!(response, PublishResponse); debug!("PublishResponse {:#?}", response); @@ -217,11 +217,11 @@ fn resend_data() { let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response - assert_eq!(session.subscriptions.publish_request_queue.len(), 0); - assert_eq!(session.subscriptions.publish_response_queue.len(), 1); + assert_eq!(session.subscriptions.publish_request_queue().len(), 0); + assert_eq!(session.subscriptions.publish_response_queue().len(), 1); // Get the response from the queue - let response = session.subscriptions.publish_response_queue.pop_back().unwrap().response; + let response = session.subscriptions.publish_response_queue().pop_back().unwrap().response; let response: PublishResponse = supported_message_as!(response, PublishResponse); debug!("PublishResponse {:#?}", response); @@ -278,18 +278,18 @@ fn publish_keep_alive() { let response = ss.async_publish(session, request_id, &request).unwrap(); assert!(response.is_none()); - assert!(!session.subscriptions.publish_request_queue.is_empty()); + assert!(!session.subscriptions.publish_request_queue().is_empty()); // Tick subscriptions to trigger a change let now = Utc::now().add(chrono::Duration::seconds(2)); let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response - assert_eq!(session.subscriptions.publish_request_queue.len(), 0); - assert_eq!(session.subscriptions.publish_response_queue.len(), 1); + assert_eq!(session.subscriptions.publish_request_queue().len(), 0); + assert_eq!(session.subscriptions.publish_response_queue().len(), 1); // Get the response from the queue - let response = session.subscriptions.publish_response_queue.pop_back().unwrap().response; + let response = session.subscriptions.publish_response_queue().pop_back().unwrap().response; let response: PublishResponse = supported_message_as!(response, PublishResponse); debug!("PublishResponse {:#?}", response); diff --git a/server/src/tests/subscriptions/subscription.rs b/server/src/tests/subscriptions/subscription.rs index 55311d2fd..6888c5347 100644 --- a/server/src/tests/subscriptions/subscription.rs +++ b/server/src/tests/subscriptions/subscription.rs @@ -40,7 +40,7 @@ fn update_state_3() { let update_state_result = s.update_state(tick_reason, p); assert_eq!(update_state_result.handled_state, HandledState::Create3); - assert_eq!(update_state_result.update_state_action, UpdateStateAction::None); + assert_eq!(update_state_result.update_state_action, UpdateStateAction::SubscriptionCreated); assert_eq!(s.state(), SubscriptionState::Normal); assert_eq!(s.message_sent(), false); } @@ -430,7 +430,7 @@ fn update_state_27() { let update_state_result = s.update_state(tick_reason, p); assert_eq!(update_state_result.handled_state, HandledState::Closed27); - assert_eq!(update_state_result.update_state_action, UpdateStateAction::CloseExpiredSubscription); + assert_eq!(update_state_result.update_state_action, UpdateStateAction::SubscriptionExpired); assert_eq!(s.state(), SubscriptionState::Closed); assert_eq!(s.lifetime_counter(), 1); assert_eq!(s.message_sent(), false); From 473a79ca5875c16df87e22735c48811c97832118 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Thu, 4 Apr 2019 23:49:02 +0100 Subject: [PATCH 2/9] Subscription refactor work --- client/src/session.rs | 175 +++++++++++++++++- server/src/subscriptions/mod.rs | 11 +- server/src/subscriptions/subscription.rs | 102 +++++----- server/src/subscriptions/subscriptions.rs | 14 +- server/src/tests/mod.rs | 4 +- .../src/tests/subscriptions/subscription.rs | 12 +- 6 files changed, 247 insertions(+), 71 deletions(-) diff --git a/client/src/session.rs b/client/src/session.rs index 781eac694..f979c7ad2 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -8,7 +8,7 @@ use std::{ result::Result, collections::HashSet, str::FromStr, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLock, mpsc}, time::{Instant, Duration}, }; use futures::{ @@ -410,22 +410,43 @@ impl Session { self.transport.is_connected() } + const POLL_SLEEP_INTERVAL: u64 = 50; + /// Runs a polling loop for this session to perform periodic activity such as processing subscriptions, /// as well as recovering from connection errors. The run command will break if the session is disconnected /// and cannot be reestablished. /// /// # Returns /// - /// * `Ok(bool)` - returns `true` if an action was performed, `false` if no action was performed - /// but polling slept for a little bit. - /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the + /// run loop to cause it to abort. /// - pub fn run(session: Arc>) { - const POLL_SLEEP_INTERVAL: u64 = 50; + pub fn run(session: Arc>) -> mpsc::Sender<()> { + let (tx, rx) = mpsc::channel(); + Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx); + tx + } + + /// Runs the server asynchronously by spawning a thread for it to run on, allowing the calling + /// thread to proceed to do other things. + pub fn run_async(session: Arc>) -> mpsc::Sender<()> { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx) + }); + tx + } + + /// Main running loop for a session + pub fn run_loop(session: Arc>, sleep_interval: u64, rx: mpsc::Receiver<()>) { loop { // Main thread has nothing to do - just wait for publish events to roll in let mut session = session.write().unwrap(); - if session.poll(POLL_SLEEP_INTERVAL).is_err() { + if rx.try_recv().is_ok() { + info!("Run session was terminated by a message"); + break; + } + if session.poll(sleep_interval).is_err() { // Break the loop if connection goes down info!("Connection to server broke, so terminating"); break; @@ -944,7 +965,17 @@ impl Session { } } - /// Sends a ReadRequest to the server + /// Reads the value of nodes by sending a [`ReadRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_read` - A list of `ReadValueId` to be read by the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of datavalues corresponding to each read operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// pub fn read(&mut self, nodes_to_read: &[ReadValueId]) -> Result>, StatusCode> { if nodes_to_read.is_empty() { // No subscriptions @@ -970,7 +1001,17 @@ impl Session { } } - /// Sends a WriteRequest to the server + /// Writes values to nodes by sending a [`WriteRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_write` - A list of `WriteValue` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of results corresponding to each write operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// pub fn write_value(&mut self, nodes_to_write: &[WriteValue]) -> Result>, StatusCode> { if nodes_to_write.is_empty() { // No subscriptions @@ -993,6 +1034,122 @@ impl Session { } } + /// Add nodes by sending a [`AddNodesRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_add` - A list of `AddNodesItem` to be added to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each add node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn add_nodes(&mut self, nodes_to_add: &[AddNodesItem]) -> Result, StatusCode> { + if nodes_to_add.is_empty() { + error!("add_nodes, called with no nodes to add"); + Err(StatusCode::BadNothingToDo) + } else { + let request = AddNodesRequest { + request_header: self.make_request_header(), + nodes_to_add: Some(nodes_to_add.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::AddNodesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Add references by sending a [`AddReferencesRequest`] to the server. + /// + /// # Arguments + /// + /// * `references_to_add` - A list of `AddReferencesItem` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each add reference operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn add_references(&mut self, references_to_add: &[AddReferencesItem]) -> Result, StatusCode> { + if references_to_add.is_empty() { + error!("add_references, called with no references to add"); + Err(StatusCode::BadNothingToDo) + } else { + let request = AddReferencesRequest { + request_header: self.make_request_header(), + references_to_add: Some(references_to_add.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::AddReferencesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Delete nodes by sending a [`DeleteNodesRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_delete` - A list of `DeleteNodesItem` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each delete node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn delete_nodes(&mut self, nodes_to_delete: &[DeleteNodesItem]) -> Result, StatusCode> { + if nodes_to_delete.is_empty() { + error!("delete_nodes, called with no nodes to delete"); + Err(StatusCode::BadNothingToDo) + } else { + let request = DeleteNodesRequest { + request_header: self.make_request_header(), + nodes_to_delete: Some(nodes_to_delete.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::DeleteNodesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Delete references by sending a [`DeleteReferencesRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_delete` - A list of `DeleteReferencesItem` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each delete node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn delete_references(&mut self, references_to_delete: &[DeleteReferencesItem]) -> Result, StatusCode> { + if references_to_delete.is_empty() { + error!("delete_references, called with no references to delete"); + Err(StatusCode::BadNothingToDo) + } else { + let request = DeleteReferencesRequest { + request_header: self.make_request_header(), + references_to_delete: Some(references_to_delete.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::DeleteReferencesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + /// Create a subscription by sending a [`CreateSubscriptionRequest`] to the server. /// /// # Arguments diff --git a/server/src/subscriptions/mod.rs b/server/src/subscriptions/mod.rs index c18d7b195..17e4f9bb2 100644 --- a/server/src/subscriptions/mod.rs +++ b/server/src/subscriptions/mod.rs @@ -1,5 +1,8 @@ -use opcua_types::SupportedMessage; -use opcua_types::service_types::PublishRequest; +use opcua_types::{ + SupportedMessage, + status_code::StatusCode, + service_types::PublishRequest, +}; /// The publish request entry preserves the request_id which is part of the chunk layer but clients /// are fickle about receiving responses from the same as the request. Normally this is easy because @@ -7,8 +10,12 @@ use opcua_types::service_types::PublishRequest; /// so that later we can send out responses that have the proper req id #[derive(Clone)] pub struct PublishRequestEntry { + // The request id pub request_id: u32, + // The request itself pub request: PublishRequest, + // The result of clearing acknowledgments when the request was received. + pub results: Option>, } #[derive(Clone, Debug, PartialEq)] diff --git a/server/src/subscriptions/subscription.rs b/server/src/subscriptions/subscription.rs index c91a45fe7..24c4ce62e 100644 --- a/server/src/subscriptions/subscription.rs +++ b/server/src/subscriptions/subscription.rs @@ -95,10 +95,19 @@ impl UpdateStateResult { #[derive(Debug, Copy, Clone, PartialEq)] pub(crate) enum TickReason { - ReceivedPublishRequest, + ReceivePublishRequest, +// PublishingTimerExpires, TickTimerFired, } +#[derive(Debug, Copy, Clone, PartialEq)] +pub(crate) enum StateEvent +{ + ReceivePublishRequest, + PublishingTimerExpires, +} + + #[derive(Debug, Clone, Serialize)] pub struct Subscription { /// Subscription id @@ -304,7 +313,7 @@ impl Subscription { pub(crate) fn tick(&mut self, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool, now: &DateTimeUtc) { // Check if the publishing interval has elapsed. Only checks on the tick timer. let publishing_interval_elapsed = match tick_reason { - TickReason::ReceivedPublishRequest => false, + TickReason::ReceivePublishRequest => false, TickReason::TickTimerFired => if self.state == SubscriptionState::Creating { true } else if self.publishing_interval <= 0f64 { @@ -335,7 +344,7 @@ impl Subscription { }; self.resend_data = false; - let notifications_available = notification.is_some(); + let notifications_available = !self.notifications.is_empty() || notification.is_some(); // If items have changed or subscription interval elapsed then we may have notifications // to send or state to update @@ -348,47 +357,50 @@ impl Subscription { publishing_timer_expired: publishing_interval_elapsed, }); trace!("subscription tick - update_state_result = {:?}", update_state_result); + self.handle_state_result(update_state_result, notification, now); + } + } - // Now act on the state's action - match update_state_result.update_state_action { - UpdateStateAction::None => { - if let Some(ref notification) = notification { - // Reset the next sequence number to the discarded notification - let notification_sequence_number = notification.sequence_number; - self.sequence_number.set_next(notification_sequence_number); - debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result); - } - // Send nothing - //println!("do nothing {:?}", update_state_result.handled_state); + fn handle_state_result(&mut self, update_state_result: UpdateStateResult, notification: Option, now: &DateTimeUtc) { + // Now act on the state's action + match update_state_result.update_state_action { + UpdateStateAction::None => { + if let Some(ref notification) = notification { + // Reset the next sequence number to the discarded notification + let notification_sequence_number = notification.sequence_number; + self.sequence_number.set_next(notification_sequence_number); + debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result); } - UpdateStateAction::ReturnKeepAlive => { - if let Some(ref notification) = notification { - // Reset the next sequence number to the discarded notification - let notification_sequence_number = notification.sequence_number; - self.sequence_number.set_next(notification_sequence_number); - debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result); - } - // Send a keep alive - debug!("Sending keep alive response"); - self.notifications.push_back(NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()))); - } - UpdateStateAction::ReturnNotifications => { - // Add the notification message to the queue - if let Some(notification) = notification { - self.notifications.push_back(notification); - } - } - UpdateStateAction::SubscriptionCreated => { - // Subscription was created successfully - self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good)) + // Send nothing + //println!("do nothing {:?}", update_state_result.handled_state); + } + UpdateStateAction::ReturnKeepAlive => { + if let Some(ref notification) = notification { + // Reset the next sequence number to the discarded notification + let notification_sequence_number = notification.sequence_number; + self.sequence_number.set_next(notification_sequence_number); + debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result); } - UpdateStateAction::SubscriptionExpired => { - // Delete the monitored items, issue a status change for the subscription - debug!("Subscription status change to closed / timeout"); - self.monitored_items.clear(); - self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout)) + // Send a keep alive + debug!("Sending keep alive response"); + self.notifications.push_back(NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()))); + } + UpdateStateAction::ReturnNotifications => { + // Add the notification message to the queue + if let Some(notification) = notification { + self.notifications.push_back(notification); } } + UpdateStateAction::SubscriptionCreated => { + // Subscription was created successfully + self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good)) + } + UpdateStateAction::SubscriptionExpired => { + // Delete the monitored items, issue a status change for the subscription + debug!("Subscription status change to closed / timeout"); + self.monitored_items.clear(); + self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout)) + } } } @@ -421,7 +433,7 @@ impl Subscription { pub(crate) fn update_state(&mut self, tick_reason: TickReason, p: SubscriptionStateParams) -> UpdateStateResult { // This function is called when a publish request is received OR the timer expired, so getting // both is invalid code somewhere - if tick_reason == TickReason::ReceivedPublishRequest && p.publishing_timer_expired { + if tick_reason == TickReason::ReceivePublishRequest && p.publishing_timer_expired { panic!("Should not be possible for timer to have expired and received publish request at same time") } @@ -477,10 +489,10 @@ impl Subscription { return UpdateStateResult::new(HandledState::Create3, UpdateStateAction::SubscriptionCreated); } SubscriptionState::Normal => { - if tick_reason == TickReason::ReceivedPublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.more_notifications)) { + if tick_reason == TickReason::ReceivePublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.more_notifications)) { // State #4 return UpdateStateResult::new(HandledState::Normal4, UpdateStateAction::None); - } else if tick_reason == TickReason::ReceivedPublishRequest && self.publishing_enabled && p.more_notifications { + } else if tick_reason == TickReason::ReceivePublishRequest && self.publishing_enabled && p.more_notifications { // State #5 self.reset_lifetime_counter(); self.message_sent = true; @@ -511,13 +523,13 @@ impl Subscription { } } SubscriptionState::Late => { - if tick_reason == TickReason::ReceivedPublishRequest && self.publishing_enabled && (p.notifications_available || p.more_notifications) { + if tick_reason == TickReason::ReceivePublishRequest && self.publishing_enabled && (p.notifications_available || p.more_notifications) { // State #10 self.reset_lifetime_counter(); self.state = SubscriptionState::Normal; self.message_sent = true; return UpdateStateResult::new(HandledState::Late10, UpdateStateAction::ReturnNotifications); - } else if tick_reason == TickReason::ReceivedPublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available && !p.more_notifications)) { + } else if tick_reason == TickReason::ReceivePublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available && !p.more_notifications)) { // State #11 self.reset_lifetime_counter(); self.state = SubscriptionState::KeepAlive; @@ -530,7 +542,7 @@ impl Subscription { } } SubscriptionState::KeepAlive => { - if tick_reason == TickReason::ReceivedPublishRequest { + if tick_reason == TickReason::ReceivePublishRequest { // State #13 return UpdateStateResult::new(HandledState::KeepAlive13, UpdateStateAction::None); } else if p.publishing_timer_expired && self.publishing_enabled && p.notifications_available && p.publishing_req_queued { diff --git a/server/src/subscriptions/subscriptions.rs b/server/src/subscriptions/subscriptions.rs index 810bcd688..2ea05b1a8 100644 --- a/server/src/subscriptions/subscriptions.rs +++ b/server/src/subscriptions/subscriptions.rs @@ -106,13 +106,15 @@ impl Subscriptions { } Err(StatusCode::BadTooManyPublishRequests) } else { - - + // Clear all acknowledged items here + // Acknowledge results + let results = self.process_subscription_acknowledgements(&request); // Add to the front of the queue - older items are popped from the back self.publish_request_queue.push_front(PublishRequestEntry { request_id, request, + results }); Ok(()) } @@ -217,9 +219,7 @@ impl Subscriptions { self.retransmission_queue.insert((subscription_id, notification_message.sequence_number), notification_message.clone()); // Acknowledge results - let results = self.process_subscription_acknowledgements(&publish_request.request); - - let response = self.make_publish_response(&publish_request, subscription_id, now, notification_message, more_notifications, available_sequence_numbers, results); + let response = self.make_publish_response(publish_request, subscription_id, now, notification_message, more_notifications, available_sequence_numbers); self.publish_response_queue.push_back(response); } @@ -329,7 +329,7 @@ impl Subscriptions { } } - fn make_publish_response(&self, publish_request: &PublishRequestEntry, subscription_id: u32, now: &DateTimeUtc, notification_message: NotificationMessage, more_notifications: bool, available_sequence_numbers: Option>, results: Option>) -> PublishResponseEntry { + fn make_publish_response(&self, publish_request: PublishRequestEntry, subscription_id: u32, now: &DateTimeUtc, notification_message: NotificationMessage, more_notifications: bool, available_sequence_numbers: Option>) -> PublishResponseEntry { let now = DateTime::from(now.clone()); PublishResponseEntry { request_id: publish_request.request_id, @@ -339,7 +339,7 @@ impl Subscriptions { available_sequence_numbers, more_notifications, notification_message, - results, + results: publish_request.results, diagnostic_infos: None, }.into(), } diff --git a/server/src/tests/mod.rs b/server/src/tests/mod.rs index f91dcecb6..00d44789e 100644 --- a/server/src/tests/mod.rs +++ b/server/src/tests/mod.rs @@ -110,6 +110,7 @@ pub fn expired_publish_requests() { request_header: RequestHeader::new(&NodeId::null(), &now, 1000), subscription_acknowledgements: None, }, + results: None, }; pr1.request.request_header.timeout_hint = 5001; @@ -119,6 +120,7 @@ pub fn expired_publish_requests() { request_header: RequestHeader::new(&NodeId::null(), &now, 2000), subscription_acknowledgements: None, }, + results: None, }; pr2.request.request_header.timeout_hint = 3000; @@ -160,6 +162,4 @@ pub fn expired_publish_requests() { panic!("Expected service faults for timed out publish requests") } } - - } \ No newline at end of file diff --git a/server/src/tests/subscriptions/subscription.rs b/server/src/tests/subscriptions/subscription.rs index 6888c5347..d96250f84 100644 --- a/server/src/tests/subscriptions/subscription.rs +++ b/server/src/tests/subscriptions/subscription.rs @@ -61,7 +61,7 @@ fn update_state_4() { // (PublishingEnabled == TRUE // && MoreNotifications == FALSE) // ) - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: false, @@ -94,7 +94,7 @@ fn update_state_5() { // set publish enabled true // set more notifications true - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: true, @@ -234,7 +234,7 @@ fn update_state_9() { fn update_state_10() { let mut s = make_subscription(SubscriptionState::Late); - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: false, @@ -256,7 +256,7 @@ fn update_state_10() { fn update_state_11() { let mut s = make_subscription(SubscriptionState::Late); - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: false, more_notifications: false, @@ -299,7 +299,7 @@ fn update_state_12() { fn update_state_13() { let mut s = make_subscription(SubscriptionState::KeepAlive); - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: false, more_notifications: false, @@ -416,7 +416,7 @@ fn update_state_27() { // set publish enabled true // set more notifications true - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: true, From 651ff8e7fde29b169eea5a5248fc33f1e33694fe Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Sat, 6 Apr 2019 20:59:29 +0100 Subject: [PATCH 3/9] Explicitly ignore return value in Session::run() in samples. --- client/src/session.rs | 24 +++++++++++++++++++----- samples/gfx-client/src/main.rs | 2 +- samples/mqtt-client/src/main.rs | 2 +- samples/simple-client/src/main.rs | 2 +- samples/web-client/src/main.rs | 2 +- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/client/src/session.rs b/client/src/session.rs index f979c7ad2..d45313efb 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -416,6 +416,9 @@ impl Session { /// as well as recovering from connection errors. The run command will break if the session is disconnected /// and cannot be reestablished. /// + /// The `run()` function returns a `Sender` that can be used to send a `()` message to the session + /// to cause it to terminate. + /// /// # Returns /// /// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the @@ -427,8 +430,17 @@ impl Session { tx } - /// Runs the server asynchronously by spawning a thread for it to run on, allowing the calling - /// thread to proceed to do other things. + /// Runs the server asynchronously on a new thread, allowing the calling + /// thread to continue do other things. + /// + /// The `run()` function returns a `Sender` that can be used to send a `()` message to the session + /// to cause it to terminate. + /// + /// # Returns + /// + /// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the + /// run loop to cause it to abort. + /// pub fn run_async(session: Arc>) -> mpsc::Sender<()> { let (tx, rx) = mpsc::channel(); thread::spawn(move || { @@ -437,8 +449,9 @@ impl Session { tx } - /// Main running loop for a session - pub fn run_loop(session: Arc>, sleep_interval: u64, rx: mpsc::Receiver<()>) { + /// The main running loop for a session. This is used by `run()` and `run_async()` to run + /// continuously until a signal is received to terminate. + fn run_loop(session: Arc>, sleep_interval: u64, rx: mpsc::Receiver<()>) { loop { // Main thread has nothing to do - just wait for publish events to roll in let mut session = session.write().unwrap(); @@ -1198,7 +1211,8 @@ impl Session { } /// This is the internal handler for create subscription that receives the callback wrapped up and reference counted. - fn create_subscription_inner(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, priority: u8, publishing_enabled: bool, + fn create_subscription_inner(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, + priority: u8, publishing_enabled: bool, callback: Arc>) -> Result { diff --git a/samples/gfx-client/src/main.rs b/samples/gfx-client/src/main.rs index 674704551..3d95bdf10 100644 --- a/samples/gfx-client/src/main.rs +++ b/samples/gfx-client/src/main.rs @@ -169,7 +169,7 @@ fn subscription_loop(nodes_to_monitor: Vec, session: Arc>, tx: mpsc::Sender<(NodeId, Da } // Loops forever. The publish thread will call the callback with changes on the variables - Session::run(session); + let _ = Session::run(session); Ok(()) } diff --git a/samples/simple-client/src/main.rs b/samples/simple-client/src/main.rs index 322ad9a14..c43d786ad 100644 --- a/samples/simple-client/src/main.rs +++ b/samples/simple-client/src/main.rs @@ -72,7 +72,7 @@ fn subscription_loop(session: Arc>) -> Result<(), StatusCode> { } // Loops forever. The publish thread will call the callback with changes on the variables - Session::run(session); + let _ = Session::run(session); Ok(()) } diff --git a/samples/web-client/src/main.rs b/samples/web-client/src/main.rs index 53559f162..e44c6969b 100644 --- a/samples/web-client/src/main.rs +++ b/samples/web-client/src/main.rs @@ -156,7 +156,7 @@ fn subscription_loop(session: Arc>) -> Result<(), StatusCode> { } // Loops forever. The publish thread will call the callback with changes on the variables - Session::run(session); + let _ = Session::run(session); Ok(()) } \ No newline at end of file From bd865b80e50e29b3655123107d0d7889caf60699 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Sat, 6 Apr 2019 21:03:45 +0100 Subject: [PATCH 4/9] Ensure discovery server registration has a retry limit of 1 --- server/src/discovery/mod.rs | 47 +++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/server/src/discovery/mod.rs b/server/src/discovery/mod.rs index b1b5dc33a..6611317fd 100644 --- a/server/src/discovery/mod.rs +++ b/server/src/discovery/mod.rs @@ -1,4 +1,4 @@ -use opcua_client::prelude::{Client, ClientConfig}; +use opcua_client::prelude::{Client, ClientConfig, ClientBuilder}; use crate::state::ServerState; @@ -7,34 +7,41 @@ pub fn register_with_discovery_server(discovery_server_url: &str, server_state: debug!("register_with_discovery_server, for {}", discovery_server_url); let server_config = trace_read_lock_unwrap!(server_state.config); - // Client's pki dir must match server's - let mut config = ClientConfig::new("DiscoveryClient", "urn:DiscoveryClient"); - config.pki_dir = server_config.pki_dir.clone(); - let mut client = Client::new(config); + // Create a client, ensuring to retry only once + let mut client = ClientBuilder::new() + .application_name("DiscoveryClient") + .application_uri("urn:DiscoveryClient") + .pki_dir(server_config.pki_dir.clone()) + .session_retry_limit(1) + .client(); - // This follows the local discovery process described in part 12 of the spec, calling - // find_servers on it first. + if let Some(mut client) = client { + // This follows the local discovery process described in part 12 of the spec, calling + // find_servers on it first. - // Connect to the server and call find_servers to ensure it is a discovery server - match client.find_servers(discovery_server_url) { - Ok(servers) => { - debug!("Servers on the discovery endpoint - {:?}", servers); - // Register the server - let registered_server = server_state.registered_server(); - match client.register_server(discovery_server_url, registered_server) { - Ok(_) => {} - Err(err) => { - error!(r#"Cannot register server with discovery server {}. + // Connect to the server and call find_servers to ensure it is a discovery server + match client.find_servers(discovery_server_url) { + Ok(servers) => { + debug!("Servers on the discovery endpoint - {:?}", servers); + // Register the server + let registered_server = server_state.registered_server(); + match client.register_server(discovery_server_url, registered_server) { + Ok(_) => {} + Err(err) => { + error!(r#"Cannot register server with discovery server {}. The errors immediately preceding this message may be caused by this issue. Check if the error "{}" indicates the reason why that the registration could not happen. The first thing you should ensure is that your server can connect to the discovery server and your server's cert is trusted by the discovery server and vice versa."#, discovery_server_url, err); + } } } + Err(err) => { + error!("Cannot find servers on discovery url {}, error = {:?}", discovery_server_url, err); + } } - Err(err) => { - error!("Cannot find servers on discovery url {}, error = {:?}", discovery_server_url, err); - } + } else { + error!("Cannot create a discovery server client config"); } debug!("register_with_discovery_server, finished"); From 7e330b4cdfa0adb471b23c1b973ca964e4d8e5bc Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Sat, 6 Apr 2019 21:46:41 +0100 Subject: [PATCH 5/9] Put debug message in quotes --- server/src/discovery/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/discovery/mod.rs b/server/src/discovery/mod.rs index 6611317fd..ea1af02b1 100644 --- a/server/src/discovery/mod.rs +++ b/server/src/discovery/mod.rs @@ -28,7 +28,7 @@ pub fn register_with_discovery_server(discovery_server_url: &str, server_state: match client.register_server(discovery_server_url, registered_server) { Ok(_) => {} Err(err) => { - error!(r#"Cannot register server with discovery server {}. + error!(r#"Cannot register server with discovery server \"{}\". The errors immediately preceding this message may be caused by this issue. Check if the error "{}" indicates the reason why that the registration could not happen. The first thing you should ensure is that your server can connect to the discovery server and your From ae802500db6ae56e675928e38945c2a02af7914c Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Sun, 7 Apr 2019 17:40:26 +0100 Subject: [PATCH 6/9] Subscription work --- samples/server.conf | 3 +- server/src/builder.rs | 23 ++++++-- server/src/config.rs | 20 ++++--- server/src/discovery/mod.rs | 4 +- server/src/services/message_handler.rs | 4 +- server/src/services/subscription.rs | 6 ++- server/src/session.rs | 4 +- server/src/state.rs | 4 +- server/src/subscriptions/monitored_item.rs | 11 ++-- server/src/subscriptions/subscription.rs | 59 ++++++++++++++------- server/src/subscriptions/subscriptions.rs | 10 ++-- server/src/tests/services/monitored_item.rs | 22 ++++---- server/src/tests/services/subscription.rs | 28 ++++++---- types/src/notification_message.rs | 1 + 14 files changed, 126 insertions(+), 73 deletions(-) diff --git a/samples/server.conf b/samples/server.conf index 4b517bdc2..e65cb36af 100644 --- a/samples/server.conf +++ b/samples/server.conf @@ -17,7 +17,8 @@ user_tokens: unused_user: user: unused pass: unused1 -discovery_url: "" +discovery_urls: + - "opc.tcp://127.0.0.1:4855/" endpoints: basic128rsa15_sign: path: / diff --git a/server/src/builder.rs b/server/src/builder.rs index 5b3dd32ec..001fbb211 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -32,6 +32,9 @@ impl ServerBuilder { ServerBuilder::new() .application_name(application_name) .endpoint("none", ServerEndpoint::new_none(DEFAULT_ENDPOINT_PATH, &user_token_ids)) + .discovery_urls(vec![ + DEFAULT_ENDPOINT_PATH.into() + ]) } /// Sample mode turns on everything including a hard coded user/pass @@ -67,6 +70,9 @@ impl ServerBuilder { ("basic256sha256_sign_encrypt", ServerEndpoint::new_basic256sha256_sign_encrypt(path, &user_token_ids)), ("no_access", ServerEndpoint::new_none("/noaccess", &[])) ]) + .discovery_urls(vec![ + DEFAULT_ENDPOINT_PATH.into() + ]) } /// Yields a [`Client`] from the values set by the builder. If the builder is not in a valid state @@ -75,7 +81,7 @@ impl ServerBuilder { /// [`Server`]: ../server/struct.Server.html pub fn server(self) -> Option { if self.is_valid() { - Some(Server::new(self.config)) + Some(Server::new(self.config())) } else { None } @@ -157,9 +163,18 @@ impl ServerBuilder { self } - /// Discovery endpoint url - the url of this server used by clients to get endpoints. - pub fn discovery_url(mut self, discovery_url: T) -> Self where T: Into { - self.config.discovery_url = discovery_url.into(); + /// Discovery endpoint urls - the urls of this server used by clients to get endpoints. + /// If the url is relative, e.g. "/" then the code will make a url for you using the port/host + /// settings as they are at the time this function is executed. + pub fn discovery_urls(mut self, discovery_urls: Vec) -> Self { + self.config.discovery_urls = discovery_urls.iter().map(|discovery_url| { + if discovery_url.starts_with("/") { + // Turn into an opc url + format!("opc.tcp://{}:{}/", self.config.tcp_config.host, self.config.tcp_config.port) + } else { + discovery_url.clone() + } + }).collect(); self } diff --git a/server/src/config.rs b/server/src/config.rs index 9b05169f2..fd76f0211 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -1,5 +1,5 @@ //! Provides configuration settings for the server including serialization and deserialization from file. -use std::path::{PathBuf}; +use std::path::PathBuf; use std::str::FromStr; use std::collections::{BTreeMap, BTreeSet}; @@ -228,7 +228,7 @@ pub struct ServerConfig { /// User tokens pub user_tokens: BTreeMap, /// discovery endpoint url which may or may not be the same as the service endpoints below. - pub discovery_url: String, + pub discovery_urls: Vec, /// Endpoints supported by the server pub endpoints: BTreeMap, /// Maximum number of subscriptions in a session @@ -263,15 +263,19 @@ impl Config for ServerConfig { } } if self.max_array_length == 0 { - error!("Server configuration is invalid. Max array length is invalid"); + error!("Server configuration is invalid. Max array length is invalid"); valid = false; } if self.max_string_length == 0 { - error!("Server configuration is invalid. Max string length is invalid"); + error!("Server configuration is invalid. Max string length is invalid"); valid = false; } if self.max_byte_string_length == 0 { - error!("Server configuration is invalid. Max byte string length is invalid"); + error!("Server configuration is invalid. Max byte string length is invalid"); + valid = false; + } + if self.discovery_urls.is_empty() { + error!("Server configuration is invalid. Discovery urls not set"); valid = false; } valid @@ -302,7 +306,7 @@ impl Default for ServerConfig { hello_timeout: constants::DEFAULT_HELLO_TIMEOUT_SECONDS, }, user_tokens: BTreeMap::new(), - discovery_url: String::new(), + discovery_urls: Vec::new(), endpoints: BTreeMap::new(), max_array_length: opcua_types_constants::MAX_ARRAY_LENGTH as u32, max_string_length: opcua_types_constants::MAX_STRING_LENGTH as u32, @@ -323,7 +327,7 @@ impl ServerConfig { let product_uri = format!("urn:{}", application_name); let pki_dir = PathBuf::from("./pki"); let discovery_server_url = Some(constants::DEFAULT_DISCOVERY_SERVER_URL.to_string()); - let discovery_url = format!("opc.tcp://{}:{}/", host, port); + let discovery_urls = vec![format!("opc.tcp://{}:{}/", host, port)]; ServerConfig { application_name, @@ -339,7 +343,7 @@ impl ServerConfig { hello_timeout: constants::DEFAULT_HELLO_TIMEOUT_SECONDS, }, user_tokens, - discovery_url, + discovery_urls, endpoints, max_array_length: opcua_types_constants::MAX_ARRAY_LENGTH as u32, max_string_length: opcua_types_constants::MAX_STRING_LENGTH as u32, diff --git a/server/src/discovery/mod.rs b/server/src/discovery/mod.rs index ea1af02b1..8afe029a5 100644 --- a/server/src/discovery/mod.rs +++ b/server/src/discovery/mod.rs @@ -1,4 +1,4 @@ -use opcua_client::prelude::{Client, ClientConfig, ClientBuilder}; +use opcua_client::prelude::ClientBuilder; use crate::state::ServerState; @@ -8,7 +8,7 @@ pub fn register_with_discovery_server(discovery_server_url: &str, server_state: let server_config = trace_read_lock_unwrap!(server_state.config); // Create a client, ensuring to retry only once - let mut client = ClientBuilder::new() + let client = ClientBuilder::new() .application_name("DiscoveryClient") .application_uri("urn:DiscoveryClient") .pki_dir(server_config.pki_dir.clone()) diff --git a/server/src/services/message_handler.rs b/server/src/services/message_handler.rs index 60da47d42..d50747cad 100644 --- a/server/src/services/message_handler.rs +++ b/server/src/services/message_handler.rs @@ -1,5 +1,7 @@ use std::sync::{Arc, RwLock}; +use chrono::Utc; + use opcua_core::crypto::{CertificateStore, SecurityPolicy}; use opcua_types::*; use opcua_types::service_types::*; @@ -267,7 +269,7 @@ impl MessageHandler { // Unlike other calls which return immediately, this one is asynchronous - the // request is queued and the response will come back out of sequence some time in // the future. - self.subscription_service.async_publish(&mut session, request_id, request)? + self.subscription_service.async_publish(&Utc::now(), &mut session, &address_space, request_id, &request)? } } SupportedMessage::RepublishRequest(ref request) => { diff --git a/server/src/services/subscription.rs b/server/src/services/subscription.rs index c2ceace02..e0e26ca0e 100644 --- a/server/src/services/subscription.rs +++ b/server/src/services/subscription.rs @@ -5,7 +5,9 @@ use opcua_types::status_code::StatusCode; use opcua_types::service_types::*; use crate::{ + DateTimeUtc, subscriptions::subscription::Subscription, + address_space::AddressSpace, state::ServerState, session::Session, services::Service, @@ -180,13 +182,13 @@ impl SubscriptionService { } /// Handles a PublishRequest. This is asynchronous, so the response will be sent later on. - pub fn async_publish(&self, session: &mut Session, request_id: u32, request: &PublishRequest) -> Result, StatusCode> { + pub fn async_publish(&self, now: &DateTimeUtc, session: &mut Session, address_space: &AddressSpace, request_id: u32, request: &PublishRequest) -> Result, StatusCode> { trace!("--> Receive a PublishRequest {:?}", request); if session.subscriptions.is_empty() { Ok(Some(self.service_fault(&request.request_header, StatusCode::BadNoSubscription))) } else { let request_header = request.request_header.clone(); - let result = session.enqueue_publish_request(request_id, request.clone()); + let result = session.enqueue_publish_request(now, request_id, request.clone(), address_space); if let Err(error) = result { Ok(Some(self.service_fault(&request_header, error))) } else { diff --git a/server/src/session.rs b/server/src/session.rs index b5a60cce4..f0e9505dc 100644 --- a/server/src/session.rs +++ b/server/src/session.rs @@ -175,8 +175,8 @@ impl Session { self.terminated_at = chrono::Utc::now(); } - pub(crate) fn enqueue_publish_request(&mut self, request_id: u32, request: PublishRequest) -> Result<(), StatusCode> { - self.subscriptions.enqueue_publish_request(request_id, request) + pub(crate) fn enqueue_publish_request(&mut self, now: &DateTimeUtc, request_id: u32, request: PublishRequest, address_space: &AddressSpace) -> Result<(), StatusCode> { + self.subscriptions.enqueue_publish_request(now, request_id, request, address_space) } pub(crate) fn tick_subscriptions(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, reason: TickReason) -> Result<(), StatusCode> { diff --git a/server/src/state.rs b/server/src/state.rs index 90c6d97bf..79ffbe4b8 100644 --- a/server/src/state.rs +++ b/server/src/state.rs @@ -170,10 +170,10 @@ impl ServerState { pub fn discovery_urls(&self) -> Option> { let config = trace_read_lock_unwrap!(self.config); - if config.discovery_url.is_empty() { + if config.discovery_urls.is_empty() { None } else { - Some(vec![UAString::from(config.discovery_url.as_ref())]) + Some(config.discovery_urls.iter().map(|url| UAString::from(url.as_ref())).collect()) } } diff --git a/server/src/subscriptions/monitored_item.rs b/server/src/subscriptions/monitored_item.rs index cbf8547c6..45b69c1f8 100644 --- a/server/src/subscriptions/monitored_item.rs +++ b/server/src/subscriptions/monitored_item.rs @@ -70,7 +70,7 @@ pub(crate) enum TickResult { } impl MonitoredItem { - pub fn new(monitored_item_id: u32, timestamps_to_return: TimestampsToReturn, request: &MonitoredItemCreateRequest) -> Result { + pub fn new(now: &DateTimeUtc, monitored_item_id: u32, timestamps_to_return: TimestampsToReturn, request: &MonitoredItemCreateRequest) -> Result { let filter = FilterType::from_filter(&request.requested_parameters.filter)?; let sampling_interval = Self::sanitize_sampling_interval(request.requested_parameters.sampling_interval); let queue_size = Self::sanitize_queue_size(request.requested_parameters.queue_size as usize); @@ -84,7 +84,7 @@ impl MonitoredItem { filter, discard_oldest: request.requested_parameters.discard_oldest, timestamps_to_return, - last_sample_time: chrono::Utc::now(), + last_sample_time: now.clone(), last_data_value: None, queue_size, notification_queue: VecDeque::with_capacity(queue_size), @@ -138,7 +138,7 @@ impl MonitoredItem { /// /// Function returns a `TickResult` denoting if the value changed or not, and whether it should /// be reported. - pub fn tick(&mut self, address_space: &AddressSpace, now: &DateTimeUtc, publishing_interval_elapsed: bool, resend_data: bool) -> TickResult { + pub fn tick(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, publishing_interval_elapsed: bool, resend_data: bool) -> TickResult { if self.monitoring_mode == MonitoringMode::Disabled { TickResult::NoChange } else { @@ -392,11 +392,6 @@ impl MonitoredItem { &self.notification_queue } - #[cfg(test)] - pub fn discard_oldest(&self) -> bool { - self.discard_oldest - } - #[cfg(test)] pub(crate) fn set_discard_oldest(&mut self, discard_oldest: bool) { self.discard_oldest = discard_oldest; diff --git a/server/src/subscriptions/subscription.rs b/server/src/subscriptions/subscription.rs index 24c4ce62e..8895d3179 100644 --- a/server/src/subscriptions/subscription.rs +++ b/server/src/subscriptions/subscription.rs @@ -96,18 +96,10 @@ impl UpdateStateResult { #[derive(Debug, Copy, Clone, PartialEq)] pub(crate) enum TickReason { ReceivePublishRequest, -// PublishingTimerExpires, + // PublishingTimerExpires, TickTimerFired, } -#[derive(Debug, Copy, Clone, PartialEq)] -pub(crate) enum StateEvent -{ - ReceivePublishRequest, - PublishingTimerExpires, -} - - #[derive(Debug, Clone, Serialize)] pub struct Subscription { /// Subscription id @@ -144,6 +136,10 @@ pub struct Subscription { resend_data: bool, /// The next sequence number to be sent sequence_number: Handle, + /// Last notification's sequence number. This is a sanity check since sequence numbers should start from + /// 1 and be sequential - it that doesn't happen the server will panic because something went + /// wrong somewhere. + last_sequence_number: u32, // The last monitored item id next_monitored_item_id: u32, // The time that the subscription interval last fired @@ -186,6 +182,7 @@ impl Subscription { resend_data: false, // Counters for new items sequence_number: Handle::new(1), + last_sequence_number: 0, next_monitored_item_id: 1, last_timer_expired_time: chrono::Utc::now(), notifications: VecDeque::with_capacity(100), @@ -211,7 +208,8 @@ impl Subscription { items_to_create.iter().map(|item_to_create| { // Create a monitored item, if possible let monitored_item_id = self.next_monitored_item_id; - match MonitoredItem::new(monitored_item_id, timestamps_to_return, item_to_create) { + let now = chrono::Utc::now(); + match MonitoredItem::new(&now, monitored_item_id, timestamps_to_return, item_to_create) { Ok(monitored_item) => { // Register the item with the subscription let revised_sampling_interval = monitored_item.sampling_interval(); @@ -310,7 +308,7 @@ impl Subscription { /// Checks the subscription and monitored items for state change, messages. Returns `true` /// if there are zero or more notifications waiting to be processed. - pub(crate) fn tick(&mut self, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool, now: &DateTimeUtc) { + pub(crate) fn tick(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool) { // Check if the publishing interval has elapsed. Only checks on the tick timer. let publishing_interval_elapsed = match tick_reason { TickReason::ReceivePublishRequest => false, @@ -322,7 +320,9 @@ impl Subscription { // Look at the last expiration time compared to now and see if it matches // or exceeds the publishing interval let publishing_interval = super::duration_from_ms(self.publishing_interval); - if now.signed_duration_since(self.last_timer_expired_time) >= publishing_interval { + let elapsed = now.signed_duration_since(self.last_timer_expired_time); + println!("Now = {:?}, Last expired = {:?}, interval = {:?}, elapsed = {:?}, expired = {:?}", now, self.last_timer_expired_time, publishing_interval, elapsed, elapsed >= publishing_interval); + if elapsed >= publishing_interval { self.last_timer_expired_time = *now; true } else { @@ -339,7 +339,7 @@ impl Subscription { SubscriptionState::Closed | SubscriptionState::Creating => (None, false), _ => { let resend_data = self.resend_data; - self.tick_monitored_items(address_space, now, publishing_interval_elapsed, resend_data) + self.tick_monitored_items(now, address_space, publishing_interval_elapsed, resend_data) } }; self.resend_data = false; @@ -361,6 +361,18 @@ impl Subscription { } } + fn enqueue_notification(&mut self, notification: NotificationMessage) { + use std::u32; + // For sanity, check the sequence number is the expected sequence number. + let expected_sequence_number = if self.last_sequence_number == u32::MAX { 1 } else { self.last_sequence_number + 1 }; + if notification.sequence_number != expected_sequence_number { + panic!("Notification's sequence number is not sequential, expecting {}, got {}", expected_sequence_number, notification.sequence_number); + } + debug!("Enqueuing notification {:?}", notification); + self.last_sequence_number = notification.sequence_number; + self.notifications.push_back(notification); + } + fn handle_state_result(&mut self, update_state_result: UpdateStateResult, notification: Option, now: &DateTimeUtc) { // Now act on the state's action match update_state_result.update_state_action { @@ -383,23 +395,32 @@ impl Subscription { } // Send a keep alive debug!("Sending keep alive response"); - self.notifications.push_back(NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()))); + let notification = NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone())); + self.enqueue_notification(notification); } UpdateStateAction::ReturnNotifications => { // Add the notification message to the queue if let Some(notification) = notification { - self.notifications.push_back(notification); + self.enqueue_notification(notification); } } UpdateStateAction::SubscriptionCreated => { + if notification.is_some() { + panic!("SubscriptionCreated got a notification"); + } // Subscription was created successfully - self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good)) +// let notification = NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good); +// self.enqueue_notification(notification); } UpdateStateAction::SubscriptionExpired => { + if notification.is_some() { + panic!("SubscriptionExpired got a notification"); + } // Delete the monitored items, issue a status change for the subscription debug!("Subscription status change to closed / timeout"); self.monitored_items.clear(); - self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout)) + let notification = NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout); + self.enqueue_notification(notification); } } } @@ -583,14 +604,14 @@ impl Subscription { /// /// The function returns a `notifications` and a `more_notifications` boolean to indicate if the notifications /// are available. - fn tick_monitored_items(&mut self, address_space: &AddressSpace, now: &DateTimeUtc, publishing_interval_elapsed: bool, resend_data: bool) -> (Option, bool) { + fn tick_monitored_items(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, publishing_interval_elapsed: bool, resend_data: bool) -> (Option, bool) { let mut triggered_items: BTreeSet = BTreeSet::new(); let mut notification_messages = Vec::new(); for (_, monitored_item) in &mut self.monitored_items { // If this returns true then the monitored item wants to report its notification let monitoring_mode = monitored_item.monitoring_mode(); - match monitored_item.tick(address_space, now, publishing_interval_elapsed, resend_data) { + match monitored_item.tick(now, address_space, publishing_interval_elapsed, resend_data) { TickResult::ReportValueChanged => { // If this monitored item has triggered items, then they need to be handled match monitoring_mode { diff --git a/server/src/subscriptions/subscriptions.rs b/server/src/subscriptions/subscriptions.rs index ed57dd999..aa5593dce 100644 --- a/server/src/subscriptions/subscriptions.rs +++ b/server/src/subscriptions/subscriptions.rs @@ -93,7 +93,7 @@ impl Subscriptions { /// /// If the queue is full this call will pop the oldest and generate a service fault /// for that before pushing the new one. - pub fn enqueue_publish_request(&mut self, request_id: u32, request: PublishRequest) -> Result<(), StatusCode> { + pub(crate) fn enqueue_publish_request(&mut self, now: &DateTimeUtc, request_id: u32, request: PublishRequest, address_space: &AddressSpace) -> Result<(), StatusCode> { // Check if we have too many requests already let max_publish_requests = self.max_publish_requests(); if self.publish_request_queue.len() >= max_publish_requests { @@ -114,9 +114,11 @@ impl Subscriptions { self.publish_request_queue.push_front(PublishRequestEntry { request_id, request, - results + results, }); - Ok(()) + + // Tick to trigger publish + self.tick(now, address_space, TickReason::ReceivePublishRequest) } } @@ -180,7 +182,7 @@ impl Subscriptions { // Now tick the subscription to see if it has any notifications. If there are // notifications then the publish response will be associated with his subscription // and ready to go. - subscription.tick(address_space, tick_reason, publishing_req_queued, now); + subscription.tick(now, address_space, tick_reason, publishing_req_queued); // Process any notifications loop { diff --git a/server/src/tests/services/monitored_item.rs b/server/src/tests/services/monitored_item.rs index 70170510b..e21ec37eb 100644 --- a/server/src/tests/services/monitored_item.rs +++ b/server/src/tests/services/monitored_item.rs @@ -80,15 +80,17 @@ fn set_triggering(session: &mut Session, subscription_id: u32, monitored_item_id (response.add_results, response.remove_results) } -fn publish_request(session: &mut Session, ss: &SubscriptionService) { +fn publish_request(session: &mut Session, address_space: &AddressSpace, ss: &SubscriptionService) { let request_id = 1001; let request = PublishRequest { request_header: RequestHeader::new(&NodeId::null(), &DateTime::now(), 1), subscription_acknowledgements: None, }; + let now = Utc::now(); + session.subscriptions.publish_request_queue().clear(); - let response = ss.async_publish(session, request_id, &request).unwrap(); + let response = ss.async_publish(&now, session, address_space, request_id, &request).unwrap(); assert!(response.is_none()); assert!(!session.subscriptions.publish_request_queue().is_empty()); } @@ -100,7 +102,7 @@ fn publish_response(session: &mut Session) -> PublishResponse { } fn publish_tick_no_response(session: &mut Session, ss: &SubscriptionService, address_space: &AddressSpace, now: DateTimeUtc, duration: chrono::Duration) -> DateTimeUtc { - publish_request(session, ss); + publish_request(session, address_space, ss); let now = now.add(duration); let _ = session.tick_subscriptions(&now, address_space, TickReason::TickTimerFired); assert_eq!(session.subscriptions.publish_response_queue().len(), 0); @@ -112,7 +114,7 @@ fn publish_tick_no_response(session: &mut Session, ss: &SubscriptionService, add fn publish_tick_response(session: &mut Session, ss: &SubscriptionService, address_space: &AddressSpace, now: DateTimeUtc, duration: chrono::Duration, handler: T) -> DateTimeUtc where T: FnOnce(PublishResponse) { - publish_request(session, ss); + publish_request(session, address_space, ss); let now = now.add(duration); let _ = session.tick_subscriptions(&now, address_space, TickReason::TickTimerFired); assert_eq!(session.subscriptions.publish_response_queue().len(), 1); @@ -123,7 +125,7 @@ fn publish_tick_response(session: &mut Session, ss: &SubscriptionService, add fn populate_monitored_item(discard_oldest: bool) -> MonitoredItem { let client_handle = 999; - let mut monitored_item = MonitoredItem::new(1, TimestampsToReturn::Both, &make_create_request(-1f64, 5)).unwrap(); + let mut monitored_item = MonitoredItem::new(&chrono::Utc::now(), 1, TimestampsToReturn::Both, &make_create_request(-1f64, 5)).unwrap(); monitored_item.set_discard_oldest(discard_oldest); for i in 0..5 { monitored_item.enqueue_notification_message(MonitoredItemNotification { @@ -275,20 +277,20 @@ fn monitored_item_data_change_filter() { // Create request should monitor attribute of variable, e.g. value // Sample interval is negative so it will always test on repeated calls - let mut monitored_item = MonitoredItem::new(1, TimestampsToReturn::Both, &make_create_request(-1f64, 5)).unwrap(); + let mut monitored_item = MonitoredItem::new(&chrono::Utc::now(), 1, TimestampsToReturn::Both, &make_create_request(-1f64, 5)).unwrap(); let now = Utc::now(); assert_eq!(monitored_item.notification_queue().len(), 0); // Expect first call to always succeed - assert_eq!(monitored_item.tick(&address_space, &now, false, false), TickResult::ReportValueChanged); + assert_eq!(monitored_item.tick(&now, &address_space, false, false), TickResult::ReportValueChanged); // Expect one item in its queue assert_eq!(monitored_item.notification_queue().len(), 1); // Expect false on next tick, with the same value because no subscription timer has fired - assert_eq!(monitored_item.tick(&address_space, &now, false, false), TickResult::NoChange); + assert_eq!(monitored_item.tick(&now, &address_space, false, false), TickResult::NoChange); assert_eq!(monitored_item.notification_queue().len(), 1); // adjust variable value @@ -301,8 +303,8 @@ fn monitored_item_data_change_filter() { } // Expect change but only when subscription timer elapsed - assert_eq!(monitored_item.tick(&address_space, &now, false, false), TickResult::NoChange); - assert_eq!(monitored_item.tick(&address_space, &now, true, false), TickResult::ReportValueChanged); + assert_eq!(monitored_item.tick(&now, &address_space, false, false), TickResult::NoChange); + assert_eq!(monitored_item.tick(&now, &address_space, true, false), TickResult::ReportValueChanged); assert_eq!(monitored_item.notification_queue().len(), 2); } diff --git a/server/src/tests/services/subscription.rs b/server/src/tests/services/subscription.rs index a88c66f7b..b52b17dd4 100644 --- a/server/src/tests/services/subscription.rs +++ b/server/src/tests/services/subscription.rs @@ -72,14 +72,14 @@ fn test_revised_keep_alive_lifetime_counts() { #[test] fn publish_with_no_subscriptions() { - do_subscription_service_test(|_, session, _, ss, _| { + do_subscription_service_test(|_, session, address_space, ss, _| { let request = PublishRequest { request_header: RequestHeader::new(&NodeId::null(), &DateTime::now(), 1), subscription_acknowledgements: None, // Option>, }; // Publish and expect a service fault BadNoSubscription let request_id = 1001; - let response = ss.async_publish(session, request_id, &request).unwrap().unwrap(); + let response = ss.async_publish(&Utc::now(), session, address_space, request_id, &request).unwrap().unwrap(); let response: ServiceFault = supported_message_as!(response, ServiceFault); assert_eq!(response.response_header.service_result, StatusCode::BadNoSubscription); }) @@ -106,14 +106,16 @@ fn publish_response_subscription() { }; debug!("PublishRequest {:#?}", request); + let now = Utc::now(); + // Don't expect a response right away - let response = ss.async_publish(session, request_id, &request).unwrap(); + let response = ss.async_publish(&now, session, address_space, request_id, &request).unwrap(); assert!(response.is_none()); assert!(!session.subscriptions.publish_request_queue().is_empty()); // Tick subscriptions to trigger a change - let now = Utc::now().add(chrono::Duration::seconds(2)); + let now = now.add(chrono::Duration::seconds(2)); let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response @@ -177,11 +179,13 @@ fn resend_data() { }; debug!("PublishRequest {:#?}", request); + let now = Utc::now(); + // Don't expect a response right away - let _response = ss.async_publish(session, 1001, &request).unwrap(); + let _response = ss.async_publish(&now, session, address_space, 1001, &request).unwrap(); // Tick subscriptions to trigger a change - let now = Utc::now().add(chrono::Duration::seconds(2)); + let now = now.add(chrono::Duration::seconds(2)); let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response @@ -209,11 +213,13 @@ fn resend_data() { }; debug!("PublishRequest {:#?}", request); + let now = Utc::now(); + // Don't expect a response right away - let _response = ss.async_publish(session, 1002, &request).unwrap(); + let _response = ss.async_publish(&now, session, address_space, 1002, &request).unwrap(); // Tick subscriptions to trigger a change - let now = Utc::now().add(chrono::Duration::seconds(2)); + let now = now.add(chrono::Duration::seconds(2)); let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response @@ -274,14 +280,16 @@ fn publish_keep_alive() { }; debug!("PublishRequest {:#?}", request); + let now = Utc::now(); + // Don't expect a response right away - let response = ss.async_publish(session, request_id, &request).unwrap(); + let response = ss.async_publish(&now, session, address_space, request_id, &request).unwrap(); assert!(response.is_none()); assert!(!session.subscriptions.publish_request_queue().is_empty()); // Tick subscriptions to trigger a change - let now = Utc::now().add(chrono::Duration::seconds(2)); + let now = now.add(chrono::Duration::seconds(2)); let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); // Ensure publish request was processed into a publish response diff --git a/types/src/notification_message.rs b/types/src/notification_message.rs index ba33086a3..311f88431 100644 --- a/types/src/notification_message.rs +++ b/types/src/notification_message.rs @@ -1,4 +1,5 @@ ///! Helpers for NotificationMessage types + use crate::{ date_time::DateTime, encoding::DecodingLimits, From 4706e41c186c3eed277b47761631bd5fce8313a7 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Mon, 8 Apr 2019 20:58:15 +0100 Subject: [PATCH 7/9] More subscription work, one test to fix --- client/src/client.rs | 2 +- server/src/address_space/address_space.rs | 1 - server/src/continuation_point.rs | 4 +- server/src/diagnostics.rs | 4 +- server/src/http/mod.rs | 4 - server/src/lib.rs | 2 - server/src/services/monitored_item.rs | 3 +- server/src/services/subscription.rs | 1 - server/src/session.rs | 1 - server/src/subscriptions/monitored_item.rs | 26 +--- server/src/subscriptions/subscription.rs | 156 ++++++++++---------- server/src/subscriptions/subscriptions.rs | 6 - server/src/tests/services/monitored_item.rs | 17 ++- server/src/tests/services/subscription.rs | 13 +- types/src/date_time.rs | 18 +-- 15 files changed, 116 insertions(+), 142 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index f0df1d019..61d797b6c 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -167,7 +167,7 @@ impl Client { }; info!("Server has these endpoints:"); - endpoints.iter().for_each(|e| println!(" {} - {:?} / {:?}", e.endpoint_url, + endpoints.iter().for_each(|e| info!(" {} - {:?} / {:?}", e.endpoint_url, SecurityPolicy::from_str(e.security_policy_uri.as_ref()).unwrap(), e.security_mode)); diff --git a/server/src/address_space/address_space.rs b/server/src/address_space/address_space.rs index 6c33ef5ef..0ec26f71a 100644 --- a/server/src/address_space/address_space.rs +++ b/server/src/address_space/address_space.rs @@ -22,7 +22,6 @@ use crate::{ state::ServerState, session::Session, constants, - DateTimeUtc, }; /// Searches for the specified node by type, expecting it to exist diff --git a/server/src/continuation_point.rs b/server/src/continuation_point.rs index 3fd8cbf31..4a98509f5 100644 --- a/server/src/continuation_point.rs +++ b/server/src/continuation_point.rs @@ -2,11 +2,9 @@ use std::sync::{Arc, Mutex}; -use opcua_types::ByteString; +use opcua_types::{ByteString, DateTimeUtc}; use opcua_types::service_types::ReferenceDescription; -use crate::DateTimeUtc; - use crate::prelude::AddressSpace; #[derive(Clone)] diff --git a/server/src/diagnostics.rs b/server/src/diagnostics.rs index 83b7052b5..995898c0e 100644 --- a/server/src/diagnostics.rs +++ b/server/src/diagnostics.rs @@ -34,7 +34,7 @@ impl Runtime { let mut running_components = trace_lock_unwrap!(self.running_components); let key = name.into(); if running_components.contains(&key) { - error!("Shouldn't be registering component {} more than once", key); + trace!("Shouldn't be registering component {} more than once", key); } running_components.insert(key); } @@ -43,7 +43,7 @@ impl Runtime { let mut running_components = trace_lock_unwrap!(self.running_components); let key = name.into(); if !running_components.contains(&key) { - error!("Shouldn't be deregistering component {} which doesn't exist", key); + trace!("Shouldn't be deregistering component {} which doesn't exist", key); } running_components.remove(&key); } diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index dc4ed2260..f9d6a942c 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -28,10 +28,6 @@ struct HttpState { server_metrics: Arc>, } -fn index(_: &HttpRequest) -> impl Responder { - fs::NamedFile::open("html/index.html") -} - #[cfg(debug_assertions)] fn abort(req: &HttpRequest) -> impl Responder { let state = req.state(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 809908dd7..de41c3029 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -35,8 +35,6 @@ extern crate serde_derive; #[macro_use] extern crate opcua_core; -type DateTimeUtc = chrono::DateTime; - lazy_static! { static ref RUNTIME: diagnostics::Runtime = diagnostics::Runtime::default(); } diff --git a/server/src/services/monitored_item.rs b/server/src/services/monitored_item.rs index 1ab19e80f..3845b365d 100644 --- a/server/src/services/monitored_item.rs +++ b/server/src/services/monitored_item.rs @@ -23,7 +23,8 @@ impl MonitoredItemService { if let Some(ref items_to_create) = request.items_to_create { // Find subscription and add items to it if let Some(subscription) = session.subscriptions.get_mut(request.subscription_id) { - let results = Some(subscription.create_monitored_items(request.timestamps_to_return, items_to_create)); + let now = chrono::Utc::now(); + let results = Some(subscription.create_monitored_items(&now, request.timestamps_to_return, items_to_create)); let response = CreateMonitoredItemsResponse { response_header: ResponseHeader::new_good(&request.request_header), results, diff --git a/server/src/services/subscription.rs b/server/src/services/subscription.rs index e0e26ca0e..4f5dde906 100644 --- a/server/src/services/subscription.rs +++ b/server/src/services/subscription.rs @@ -5,7 +5,6 @@ use opcua_types::status_code::StatusCode; use opcua_types::service_types::*; use crate::{ - DateTimeUtc, subscriptions::subscription::Subscription, address_space::AddressSpace, state::ServerState, diff --git a/server/src/session.rs b/server/src/session.rs index f0e9505dc..2be595810 100644 --- a/server/src/session.rs +++ b/server/src/session.rs @@ -15,7 +15,6 @@ use crate::{ address_space::AddressSpace, continuation_point::BrowseContinuationPoint, diagnostics::ServerDiagnostics, - DateTimeUtc, server::Server, subscriptions::subscription::TickReason, subscriptions::subscriptions::Subscriptions, diff --git a/server/src/subscriptions/monitored_item.rs b/server/src/subscriptions/monitored_item.rs index 45b69c1f8..cf671cbda 100644 --- a/server/src/subscriptions/monitored_item.rs +++ b/server/src/subscriptions/monitored_item.rs @@ -1,8 +1,6 @@ use std::result::Result; use std::collections::{VecDeque, BTreeSet}; -use chrono; - use opcua_types::{ *, status_code::StatusCode, @@ -12,7 +10,7 @@ use opcua_types::{ }, }; -use crate::{constants, DateTimeUtc, address_space::AddressSpace}; +use crate::{constants, address_space::AddressSpace}; #[derive(Debug, Clone, PartialEq, Serialize)] pub(crate) enum FilterType { @@ -142,8 +140,8 @@ impl MonitoredItem { if self.monitoring_mode == MonitoringMode::Disabled { TickResult::NoChange } else { - let check_value = if resend_data || self.last_data_value.is_none() { - // Always check on the first tick + let check_value = if resend_data { + // Always check for resend_data flag true } else if self.sampling_interval < 0f64 { // -1 means use the subscription publishing interval so if the publishing interval elapsed, @@ -163,7 +161,9 @@ impl MonitoredItem { // Test the value (or don't) if check_value { // Indicate a change if reporting is enabled - if self.check_value(address_space, now, resend_data) { + let first_tick = self.last_data_value.is_none(); + let value_changed = self.check_value(address_space, now, resend_data); + if first_tick || value_changed { if self.monitoring_mode == MonitoringMode::Reporting { TickResult::ReportValueChanged } else { @@ -299,20 +299,8 @@ impl MonitoredItem { } } - /// Gets the last notification (and discards the remainder to prevent out of sequence events) from - /// the notification queue. - #[cfg(test)] - pub fn latest_notification_message(&mut self) -> Option { - let result = self.notification_queue.pop_back(); - if result.is_some() { - self.queue_overflow = false; - self.notification_queue.clear(); - } - result - } - /// Retrieves all the notification messages from the queue, oldest to newest - pub fn all_notification_messages(&mut self) -> Option> { + pub fn all_notifications(&mut self) -> Option> { if self.notification_queue.is_empty() { None } else { diff --git a/server/src/subscriptions/subscription.rs b/server/src/subscriptions/subscription.rs index 8895d3179..4292f3315 100644 --- a/server/src/subscriptions/subscription.rs +++ b/server/src/subscriptions/subscription.rs @@ -15,7 +15,6 @@ use opcua_core::handle::Handle; use crate::{ constants, - DateTimeUtc, subscriptions::monitored_item::{MonitoredItem, TickResult}, address_space::AddressSpace, diagnostics::ServerDiagnostics, @@ -128,7 +127,7 @@ pub struct Subscription { /// Message has been sent on the Subscription. It is a flag that is used to ensure that either /// a NotificationMessage or a keep-alive Message is sent out the first time the publishing timer /// expires. - message_sent: bool, + first_message_sent: bool, /// The parameter that requests publishing to be enabled or disabled. publishing_enabled: bool, /// A flag that tells the subscription to send the latest value of every monitored item on the @@ -177,7 +176,7 @@ impl Subscription { state: SubscriptionState::Creating, lifetime_counter, keep_alive_counter, - message_sent: false, + first_message_sent: false, publishing_enabled, resend_data: false, // Counters for new items @@ -201,15 +200,14 @@ impl Subscription { } /// Creates monitored items on the specified subscription, returning the creation results - pub fn create_monitored_items(&mut self, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest]) -> Vec { + pub fn create_monitored_items(&mut self, now: &DateTimeUtc, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest]) -> Vec { self.reset_lifetime_counter(); // Add items to the subscription if they're not already in its items_to_create.iter().map(|item_to_create| { // Create a monitored item, if possible let monitored_item_id = self.next_monitored_item_id; - let now = chrono::Utc::now(); - match MonitoredItem::new(&now, monitored_item_id, timestamps_to_return, item_to_create) { + match MonitoredItem::new(now, monitored_item_id, timestamps_to_return, item_to_create) { Ok(monitored_item) => { // Register the item with the subscription let revised_sampling_interval = monitored_item.sampling_interval(); @@ -306,28 +304,35 @@ impl Subscription { self.resend_data = true; } + /// Tests if the publishing interval has elapsed since the last time this function in which case + /// it returns `true` and updates its internal state. + fn test_and_set_publishing_timer_expired(&mut self, now: &DateTimeUtc) -> bool { + // Look at the last expiration time compared to now and see if it matches + // or exceeds the publishing interval + let publishing_interval = super::duration_from_ms(self.publishing_interval); + let elapsed = now.signed_duration_since(self.last_timer_expired_time); + if elapsed >= publishing_interval { + self.last_timer_expired_time = *now; + true + } else { + false + } + } + /// Checks the subscription and monitored items for state change, messages. Returns `true` /// if there are zero or more notifications waiting to be processed. pub(crate) fn tick(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool) { // Check if the publishing interval has elapsed. Only checks on the tick timer. - let publishing_interval_elapsed = match tick_reason { - TickReason::ReceivePublishRequest => false, + let publishing_timer_expired = match tick_reason { + TickReason::ReceivePublishRequest => { + false + } TickReason::TickTimerFired => if self.state == SubscriptionState::Creating { true } else if self.publishing_interval <= 0f64 { panic!("Publishing interval should have been revised to min interval") } else { - // Look at the last expiration time compared to now and see if it matches - // or exceeds the publishing interval - let publishing_interval = super::duration_from_ms(self.publishing_interval); - let elapsed = now.signed_duration_since(self.last_timer_expired_time); - println!("Now = {:?}, Last expired = {:?}, interval = {:?}, elapsed = {:?}, expired = {:?}", now, self.last_timer_expired_time, publishing_interval, elapsed, elapsed >= publishing_interval); - if elapsed >= publishing_interval { - self.last_timer_expired_time = *now; - true - } else { - false - } + self.test_and_set_publishing_timer_expired(now) } }; @@ -339,7 +344,7 @@ impl Subscription { SubscriptionState::Closed | SubscriptionState::Creating => (None, false), _ => { let resend_data = self.resend_data; - self.tick_monitored_items(now, address_space, publishing_interval_elapsed, resend_data) + self.tick_monitored_items(now, address_space, publishing_timer_expired, resend_data) } }; self.resend_data = false; @@ -348,16 +353,16 @@ impl Subscription { // If items have changed or subscription interval elapsed then we may have notifications // to send or state to update - if notifications_available || publishing_interval_elapsed || publishing_req_queued { + if notifications_available || publishing_timer_expired || publishing_req_queued { // Update the internal state of the subscription based on what happened let update_state_result = self.update_state(tick_reason, SubscriptionStateParams { publishing_req_queued, notifications_available, more_notifications, - publishing_timer_expired: publishing_interval_elapsed, + publishing_timer_expired, }); trace!("subscription tick - update_state_result = {:?}", update_state_result); - self.handle_state_result(update_state_result, notification, now); + self.handle_state_result(now, update_state_result, notification); } } @@ -368,12 +373,12 @@ impl Subscription { if notification.sequence_number != expected_sequence_number { panic!("Notification's sequence number is not sequential, expecting {}, got {}", expected_sequence_number, notification.sequence_number); } - debug!("Enqueuing notification {:?}", notification); + // debug!("Enqueuing notification {:?}", notification); self.last_sequence_number = notification.sequence_number; self.notifications.push_back(notification); } - fn handle_state_result(&mut self, update_state_result: UpdateStateResult, notification: Option, now: &DateTimeUtc) { + fn handle_state_result(&mut self, now: &DateTimeUtc, update_state_result: UpdateStateResult, notification: Option) { // Now act on the state's action match update_state_result.update_state_action { UpdateStateAction::None => { @@ -384,7 +389,6 @@ impl Subscription { debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result); } // Send nothing - //println!("do nothing {:?}", update_state_result.handled_state); } UpdateStateAction::ReturnKeepAlive => { if let Some(ref notification) = notification { @@ -472,7 +476,7 @@ impl Subscription { self.publishing_enabled, self.keep_alive_counter, self.lifetime_counter, - self.message_sent); + self.first_message_sent); } } @@ -506,7 +510,7 @@ impl Subscription { // Handled in message handler // State #3 self.state = SubscriptionState::Normal; - self.message_sent = false; + self.first_message_sent = false; return UpdateStateResult::new(HandledState::Create3, UpdateStateAction::SubscriptionCreated); } SubscriptionState::Normal => { @@ -516,26 +520,26 @@ impl Subscription { } else if tick_reason == TickReason::ReceivePublishRequest && self.publishing_enabled && p.more_notifications { // State #5 self.reset_lifetime_counter(); - self.message_sent = true; + self.first_message_sent = true; return UpdateStateResult::new(HandledState::Normal5, UpdateStateAction::ReturnNotifications); } else if p.publishing_timer_expired && p.publishing_req_queued && self.publishing_enabled && p.notifications_available { // State #6 self.reset_lifetime_counter(); self.start_publishing_timer(); - self.message_sent = true; + self.first_message_sent = true; return UpdateStateResult::new(HandledState::IntervalElapsed6, UpdateStateAction::ReturnNotifications); - } else if p.publishing_timer_expired && p.publishing_req_queued && !self.message_sent && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available)) { + } else if p.publishing_timer_expired && p.publishing_req_queued && !self.first_message_sent && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available)) { // State #7 self.reset_lifetime_counter(); self.start_publishing_timer(); - self.message_sent = true; + self.first_message_sent = true; return UpdateStateResult::new(HandledState::IntervalElapsed7, UpdateStateAction::ReturnKeepAlive); - } else if p.publishing_timer_expired && !p.publishing_req_queued && (!self.message_sent || (self.publishing_enabled && p.notifications_available)) { + } else if p.publishing_timer_expired && !p.publishing_req_queued && (!self.first_message_sent || (self.publishing_enabled && p.notifications_available)) { // State #8 self.start_publishing_timer(); self.state = SubscriptionState::Late; return UpdateStateResult::new(HandledState::IntervalElapsed8, UpdateStateAction::None); - } else if p.publishing_timer_expired && self.message_sent && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available)) { + } else if p.publishing_timer_expired && self.first_message_sent && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available)) { // State #9 self.start_publishing_timer(); self.reset_keep_alive_counter(); @@ -548,13 +552,13 @@ impl Subscription { // State #10 self.reset_lifetime_counter(); self.state = SubscriptionState::Normal; - self.message_sent = true; + self.first_message_sent = true; return UpdateStateResult::new(HandledState::Late10, UpdateStateAction::ReturnNotifications); } else if tick_reason == TickReason::ReceivePublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available && !p.more_notifications)) { // State #11 self.reset_lifetime_counter(); self.state = SubscriptionState::KeepAlive; - self.message_sent = true; + self.first_message_sent = true; return UpdateStateResult::new(HandledState::Late11, UpdateStateAction::ReturnKeepAlive); } else if p.publishing_timer_expired { // State #12 @@ -568,7 +572,7 @@ impl Subscription { return UpdateStateResult::new(HandledState::KeepAlive13, UpdateStateAction::None); } else if p.publishing_timer_expired && self.publishing_enabled && p.notifications_available && p.publishing_req_queued { // State #14 - self.message_sent = true; + self.first_message_sent = true; self.state = SubscriptionState::Normal; return UpdateStateResult::new(HandledState::KeepAlive14, UpdateStateAction::ReturnNotifications); } else if p.publishing_timer_expired && p.publishing_req_queued && self.keep_alive_counter == 1 && (!self.publishing_enabled || (self.publishing_enabled && p.notifications_available)) { @@ -593,7 +597,6 @@ impl Subscription { } } - // println!("No state handled {:?}, {:?}", tick_reason, p); UpdateStateResult::new(HandledState::None0, UpdateStateAction::None) } @@ -606,49 +609,51 @@ impl Subscription { /// are available. fn tick_monitored_items(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, publishing_interval_elapsed: bool, resend_data: bool) -> (Option, bool) { let mut triggered_items: BTreeSet = BTreeSet::new(); - let mut notification_messages = Vec::new(); + let mut monitored_item_notifications = Vec::with_capacity(self.monitored_items.len() * 2); for (_, monitored_item) in &mut self.monitored_items { // If this returns true then the monitored item wants to report its notification let monitoring_mode = monitored_item.monitoring_mode(); match monitored_item.tick(now, address_space, publishing_interval_elapsed, resend_data) { TickResult::ReportValueChanged => { - // If this monitored item has triggered items, then they need to be handled - match monitoring_mode { - MonitoringMode::Reporting => { - // From triggering docs - // If the monitoring mode of the triggering item is REPORTING, then it is reported when the - // triggering item triggers the items to report. - monitored_item.triggered_items().iter().for_each(|i| { - triggered_items.insert(*i); - }) - } - _ => { - // Sampling should have gone in the other branch. Disabled shouldn't do anything. - panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode); - } - } if publishing_interval_elapsed { + // If this monitored item has triggered items, then they need to be handled + match monitoring_mode { + MonitoringMode::Reporting => { + // From triggering docs + // If the monitoring mode of the triggering item is REPORTING, then it is reported when the + // triggering item triggers the items to report. + monitored_item.triggered_items().iter().for_each(|i| { + triggered_items.insert(*i); + }) + } + _ => { + // Sampling should have gone in the other branch. Disabled shouldn't do anything. + panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode); + } + } // Take some / all of the monitored item's pending notifications - if let Some(mut item_notification_messages) = monitored_item.all_notification_messages() { - notification_messages.append(&mut item_notification_messages); + if let Some(mut item_notification_messages) = monitored_item.all_notifications() { + monitored_item_notifications.append(&mut item_notification_messages); } } } TickResult::ValueChanged => { // The monitored item doesn't have changes to report but its value did change so it // is still necessary to check its triggered items. - match monitoring_mode { - MonitoringMode::Sampling => { - // If the monitoring mode of the triggering item is SAMPLING, then it is not reported when the - // triggering item triggers the items to report. - monitored_item.triggered_items().iter().for_each(|i| { - triggered_items.insert(*i); - }) - } - _ => { - // Reporting should have gone in the other branch. Disabled shouldn't do anything. - panic!("How can there be a value change when the mode is not sampling?"); + if publishing_interval_elapsed { + match monitoring_mode { + MonitoringMode::Sampling => { + // If the monitoring mode of the triggering item is SAMPLING, then it is not reported when the + // triggering item triggers the items to report. + monitored_item.triggered_items().iter().for_each(|i| { + triggered_items.insert(*i); + }) + } + _ => { + // Reporting should have gone in the other branch. Disabled shouldn't do anything. + panic!("How can there be a value change when the mode is not sampling?"); + } } } } @@ -669,8 +674,8 @@ impl Subscription { // // Call with the resend_data flag as true to force the monitored item to monitored_item.check_value(address_space, now, true); - if let Some(mut item_notification_messages) = monitored_item.all_notification_messages() { - notification_messages.append(&mut item_notification_messages); + if let Some(mut notifications) = monitored_item.all_notifications() { + monitored_item_notifications.append(&mut notifications); } } MonitoringMode::Reporting => { @@ -690,12 +695,12 @@ impl Subscription { } }); - if !notification_messages.is_empty() { + + // Produce a data change notification + if !monitored_item_notifications.is_empty() { let next_sequence_number = self.sequence_number.next(); debug!("Create notification for subscription {}, sequence number {}", self.subscription_id, next_sequence_number); - // Create a notification message and push it onto the queue - let notification = NotificationMessage::data_change(next_sequence_number, DateTime::now(), notification_messages); - // Advance next sequence number + let notification = NotificationMessage::data_change(next_sequence_number, DateTime::from(now.clone()), monitored_item_notifications); (Some(notification), false) } else { (None, false) @@ -743,6 +748,7 @@ impl Subscription { self.keep_alive_counter = keep_alive_counter; } + #[cfg(test)] pub(crate) fn state(&self) -> SubscriptionState { self.state } @@ -753,12 +759,12 @@ impl Subscription { } pub fn message_sent(&self) -> bool { - self.message_sent + self.first_message_sent } #[cfg(test)] pub(crate) fn set_message_sent(&mut self, message_sent: bool) { - self.message_sent = message_sent; + self.first_message_sent = message_sent; } pub fn publishing_interval(&self) -> Duration { diff --git a/server/src/subscriptions/subscriptions.rs b/server/src/subscriptions/subscriptions.rs index aa5593dce..0cbd3fbbe 100644 --- a/server/src/subscriptions/subscriptions.rs +++ b/server/src/subscriptions/subscriptions.rs @@ -10,7 +10,6 @@ use opcua_types::{ use crate::{ address_space::types::AddressSpace, - DateTimeUtc, subscriptions::{ PublishRequestEntry, PublishResponseEntry, subscription::{Subscription, TickReason}, @@ -171,11 +170,6 @@ impl Subscriptions { // Now tick over the subscriptions for subscription_id in subscription_ids { - let subscription_state = { - let subscription = self.subscriptions.get(&subscription_id).unwrap(); - subscription.state() - }; - let publishing_req_queued = !self.publish_request_queue.is_empty(); let subscription = self.subscriptions.get_mut(&subscription_id).unwrap(); diff --git a/server/src/tests/services/monitored_item.rs b/server/src/tests/services/monitored_item.rs index e21ec37eb..5d3df45b1 100644 --- a/server/src/tests/services/monitored_item.rs +++ b/server/src/tests/services/monitored_item.rs @@ -11,7 +11,6 @@ use crate::{ subscription::SubscriptionService, monitored_item::MonitoredItemService, }, - DateTimeUtc, }; use super::*; @@ -80,17 +79,15 @@ fn set_triggering(session: &mut Session, subscription_id: u32, monitored_item_id (response.add_results, response.remove_results) } -fn publish_request(session: &mut Session, address_space: &AddressSpace, ss: &SubscriptionService) { +fn publish_request(now: &DateTimeUtc, session: &mut Session, address_space: &AddressSpace, ss: &SubscriptionService) { let request_id = 1001; let request = PublishRequest { request_header: RequestHeader::new(&NodeId::null(), &DateTime::now(), 1), subscription_acknowledgements: None, }; - let now = Utc::now(); - session.subscriptions.publish_request_queue().clear(); - let response = ss.async_publish(&now, session, address_space, request_id, &request).unwrap(); + let response = ss.async_publish(now, session, address_space, request_id, &request).unwrap(); assert!(response.is_none()); assert!(!session.subscriptions.publish_request_queue().is_empty()); } @@ -102,7 +99,7 @@ fn publish_response(session: &mut Session) -> PublishResponse { } fn publish_tick_no_response(session: &mut Session, ss: &SubscriptionService, address_space: &AddressSpace, now: DateTimeUtc, duration: chrono::Duration) -> DateTimeUtc { - publish_request(session, address_space, ss); + publish_request(&now, session, address_space, ss); let now = now.add(duration); let _ = session.tick_subscriptions(&now, address_space, TickReason::TickTimerFired); assert_eq!(session.subscriptions.publish_response_queue().len(), 0); @@ -114,7 +111,7 @@ fn publish_tick_no_response(session: &mut Session, ss: &SubscriptionService, add fn publish_tick_response(session: &mut Session, ss: &SubscriptionService, address_space: &AddressSpace, now: DateTimeUtc, duration: chrono::Duration, handler: T) -> DateTimeUtc where T: FnOnce(PublishResponse) { - publish_request(session, address_space, ss); + publish_request(&now, session, address_space, ss); let now = now.add(duration); let _ = session.tick_subscriptions(&now, address_space, TickReason::TickTimerFired); assert_eq!(session.subscriptions.publish_response_queue().len(), 1); @@ -284,7 +281,7 @@ fn monitored_item_data_change_filter() { assert_eq!(monitored_item.notification_queue().len(), 0); // Expect first call to always succeed - assert_eq!(monitored_item.tick(&now, &address_space, false, false), TickResult::ReportValueChanged); + assert_eq!(monitored_item.tick(&now, &address_space, true, false), TickResult::ReportValueChanged); // Expect one item in its queue assert_eq!(monitored_item.notification_queue().len(), 1); @@ -293,6 +290,10 @@ fn monitored_item_data_change_filter() { assert_eq!(monitored_item.tick(&now, &address_space, false, false), TickResult::NoChange); assert_eq!(monitored_item.notification_queue().len(), 1); + // Expect false because publish timer elapses but value has not changed changed + assert_eq!(monitored_item.tick(&now, &address_space, false, false), TickResult::NoChange); + assert_eq!(monitored_item.notification_queue().len(), 1); + // adjust variable value if let &mut NodeType::Variable(ref mut node) = address_space.find_node_mut(&test_var_node_id()).unwrap() { let mut value = node.value(); diff --git a/server/src/tests/services/subscription.rs b/server/src/tests/services/subscription.rs index b52b17dd4..407e56694 100644 --- a/server/src/tests/services/subscription.rs +++ b/server/src/tests/services/subscription.rs @@ -91,8 +91,10 @@ fn publish_response_subscription() { // Create subscription let subscription_id = create_subscription(server_state, session, &ss); + let now = Utc::now(); + // Create a monitored item - create_monitored_item(subscription_id, VariableId::Server_ServerStatus_CurrentTime, session, &mis); + create_monitored_item(subscription_id, VariableId::Server_ServerStatus_StartTime, session, &mis); // Put the subscription into normal state session.subscriptions.get_mut(subscription_id).unwrap().set_state(SubscriptionState::Normal); @@ -106,15 +108,8 @@ fn publish_response_subscription() { }; debug!("PublishRequest {:#?}", request); - let now = Utc::now(); - - // Don't expect a response right away - let response = ss.async_publish(&now, session, address_space, request_id, &request).unwrap(); - assert!(response.is_none()); - - assert!(!session.subscriptions.publish_request_queue().is_empty()); - // Tick subscriptions to trigger a change + let _ = ss.async_publish(&now, session, address_space, request_id, &request).unwrap(); let now = now.add(chrono::Duration::seconds(2)); let _ = session.tick_subscriptions(&now, &address_space, TickReason::TickTimerFired); diff --git a/types/src/date_time.rs b/types/src/date_time.rs index 661b71227..0b00ab408 100644 --- a/types/src/date_time.rs +++ b/types/src/date_time.rs @@ -14,13 +14,13 @@ const TICKS_PER_SECOND: i64 = NANOS_PER_SECOND / NANOS_PER_TICK; const MIN_YEAR: u16 = 1601; const MAX_YEAR: u16 = 9999; -type UtcDateTime = chrono::DateTime; +pub type DateTimeUtc = chrono::DateTime; /// A date/time value. This is a wrapper around the chrono type with extra functionality /// for obtaining ticks in OPC UA measurements, endtimes, epoch etc. #[derive(PartialEq, Debug, Clone)] pub struct DateTime { - date_time: UtcDateTime, + date_time: DateTimeUtc, } impl Serialize for DateTime { @@ -103,8 +103,8 @@ impl From<(u16, u16, u16, u16, u16, u16, u32)> for DateTime { } } -impl From for DateTime { - fn from(date_time: UtcDateTime) -> Self { +impl From for DateTime { + fn from(date_time: DateTimeUtc) -> Self { // OPC UA date time is more granular with nanos, so the value supplied is made granular too let year = date_time.year(); let month = date_time.month(); @@ -139,8 +139,8 @@ impl Into for DateTime { } } -impl Into for DateTime { - fn into(self) -> UtcDateTime { +impl Into for DateTime { + fn into(self) -> DateTimeUtc { self.as_chrono() } } @@ -212,18 +212,18 @@ impl DateTime { } /// Time as chrono - pub fn as_chrono(&self) -> UtcDateTime { + pub fn as_chrono(&self) -> DateTimeUtc { self.date_time } /// The OPC UA epoch - Jan 1 1601 00:00:00 - fn epoch_chrono() -> UtcDateTime { + fn epoch_chrono() -> DateTimeUtc { Utc.ymd(MIN_YEAR as i32, 1, 1).and_hms(0, 0, 0) } /// The OPC UA endtimes - Dec 31 9999 23:59:59 i.e. the date after which dates are returned as MAX_INT64 ticks /// Spec doesn't say what happens in the last second before midnight... - fn endtimes_chrono() -> UtcDateTime { + fn endtimes_chrono() -> DateTimeUtc { Utc.ymd(MAX_YEAR as i32, 12, 31).and_hms(23, 59, 59) } From dfcb3f848d094fe03630ef4583b7c199d6e9ab9c Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Mon, 8 Apr 2019 21:58:52 +0100 Subject: [PATCH 8/9] Disable a test temporarily --- server/src/subscriptions/monitored_item.rs | 3 +++ server/src/tests/services/monitored_item.rs | 8 ++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/server/src/subscriptions/monitored_item.rs b/server/src/subscriptions/monitored_item.rs index cf671cbda..9cc6a7a77 100644 --- a/server/src/subscriptions/monitored_item.rs +++ b/server/src/subscriptions/monitored_item.rs @@ -184,6 +184,9 @@ impl MonitoredItem { /// /// The function will return true if the value was changed, false otherwise. pub fn check_value(&mut self, address_space: &AddressSpace, now: &DateTimeUtc, resend_data: bool) -> bool { + if self.monitoring_mode == MonitoringMode::Disabled { + panic!("Should not check value while monitoring mode is disabled"); + } self.last_sample_time = *now; if let Some(node) = address_space.find_node(&self.item_to_monitor.node_id) { let node = node.as_node(); diff --git a/server/src/tests/services/monitored_item.rs b/server/src/tests/services/monitored_item.rs index 5d3df45b1..4b4343abf 100644 --- a/server/src/tests/services/monitored_item.rs +++ b/server/src/tests/services/monitored_item.rs @@ -309,7 +309,7 @@ fn monitored_item_data_change_filter() { assert_eq!(monitored_item.notification_queue().len(), 2); } -#[test] +//#[test] fn monitored_item_triggers() { do_subscription_service_test(|server_state, session, address_space, ss: SubscriptionService, mis: MonitoredItemService| { // Create subscription @@ -384,7 +384,7 @@ fn monitored_item_triggers() { set_monitoring_mode(session, subscription_id, triggered_item_ids[2], MonitoringMode::Reporting, &mis); // Change the triggering item's value - let _ = address_space.set_variable_value(triggering_node.clone(), 1, &DateTime::now(), &DateTime::now()); + let _ = address_space.set_variable_value(triggering_node.clone(), 1, &DateTime::from(now.clone()), &DateTime::from(now.clone())); // In this case, the triggering item changes, but triggered items are all reporting so are ignored unless they themselves // need to report. Only 3 will fire because it was disabled previously @@ -405,7 +405,7 @@ fn monitored_item_triggers() { // change monitoring mode of triggering item to sampling and change value set_monitoring_mode(session, subscription_id, triggering_item_id, MonitoringMode::Sampling, &mis); - let _ = address_space.set_variable_value(triggering_node.clone(), 2, &DateTime::now(), &DateTime::now()); + let _ = address_space.set_variable_value(triggering_node.clone(), 2, &DateTime::from(now.clone()), &DateTime::from(now.clone())); // do a publish on the monitored item, let now = publish_tick_response(session, &ss, address_space, now, chrono::Duration::seconds(2), |response| { @@ -420,7 +420,7 @@ fn monitored_item_triggers() { // change monitoring mode of triggering item to disable set_monitoring_mode(session, subscription_id, triggering_item_id, MonitoringMode::Disabled, &mis); - let _ = address_space.set_variable_value(triggering_node.clone(), 3, &DateTime::now(), &DateTime::now()); + let _ = address_space.set_variable_value(triggering_node.clone(), 3, &DateTime::from(now.clone()), &DateTime::from(now.clone())); // do a publish on the monitored item, expect 0 data changes let _ = publish_tick_no_response(session, &ss, address_space, now, chrono::Duration::seconds(2)); From 72de06120d51bfec21b0210e0b0d7f7095885d0b Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Mon, 8 Apr 2019 23:01:00 +0100 Subject: [PATCH 9/9] Tweak stuff and reenable a test --- server/src/subscriptions/monitored_item.rs | 16 ++++++++-------- server/src/tests/services/monitored_item.rs | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/subscriptions/monitored_item.rs b/server/src/subscriptions/monitored_item.rs index 9cc6a7a77..ae46580e6 100644 --- a/server/src/subscriptions/monitored_item.rs +++ b/server/src/subscriptions/monitored_item.rs @@ -159,18 +159,18 @@ impl MonitoredItem { }; // Test the value (or don't) - if check_value { + let value_changed = check_value && { // Indicate a change if reporting is enabled let first_tick = self.last_data_value.is_none(); let value_changed = self.check_value(address_space, now, resend_data); - if first_tick || value_changed { - if self.monitoring_mode == MonitoringMode::Reporting { - TickResult::ReportValueChanged - } else { - TickResult::ValueChanged - } + first_tick || value_changed || !self.notification_queue.is_empty() + }; + + if value_changed { + if self.monitoring_mode == MonitoringMode::Reporting { + TickResult::ReportValueChanged } else { - TickResult::NoChange + TickResult::ValueChanged } } else { TickResult::NoChange diff --git a/server/src/tests/services/monitored_item.rs b/server/src/tests/services/monitored_item.rs index 4b4343abf..a1145e615 100644 --- a/server/src/tests/services/monitored_item.rs +++ b/server/src/tests/services/monitored_item.rs @@ -309,7 +309,7 @@ fn monitored_item_data_change_filter() { assert_eq!(monitored_item.notification_queue().len(), 2); } -//#[test] +#[test] fn monitored_item_triggers() { do_subscription_service_test(|server_state, session, address_space, ss: SubscriptionService, mis: MonitoredItemService| { // Create subscription