From 473a79ca5875c16df87e22735c48811c97832118 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Thu, 4 Apr 2019 23:49:02 +0100 Subject: [PATCH] Subscription refactor work --- client/src/session.rs | 175 +++++++++++++++++- server/src/subscriptions/mod.rs | 11 +- server/src/subscriptions/subscription.rs | 102 +++++----- server/src/subscriptions/subscriptions.rs | 14 +- server/src/tests/mod.rs | 4 +- .../src/tests/subscriptions/subscription.rs | 12 +- 6 files changed, 247 insertions(+), 71 deletions(-) diff --git a/client/src/session.rs b/client/src/session.rs index 781eac694..f979c7ad2 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -8,7 +8,7 @@ use std::{ result::Result, collections::HashSet, str::FromStr, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLock, mpsc}, time::{Instant, Duration}, }; use futures::{ @@ -410,22 +410,43 @@ impl Session { self.transport.is_connected() } + const POLL_SLEEP_INTERVAL: u64 = 50; + /// Runs a polling loop for this session to perform periodic activity such as processing subscriptions, /// as well as recovering from connection errors. The run command will break if the session is disconnected /// and cannot be reestablished. /// /// # Returns /// - /// * `Ok(bool)` - returns `true` if an action was performed, `false` if no action was performed - /// but polling slept for a little bit. - /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the + /// run loop to cause it to abort. /// - pub fn run(session: Arc>) { - const POLL_SLEEP_INTERVAL: u64 = 50; + pub fn run(session: Arc>) -> mpsc::Sender<()> { + let (tx, rx) = mpsc::channel(); + Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx); + tx + } + + /// Runs the server asynchronously by spawning a thread for it to run on, allowing the calling + /// thread to proceed to do other things. + pub fn run_async(session: Arc>) -> mpsc::Sender<()> { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx) + }); + tx + } + + /// Main running loop for a session + pub fn run_loop(session: Arc>, sleep_interval: u64, rx: mpsc::Receiver<()>) { loop { // Main thread has nothing to do - just wait for publish events to roll in let mut session = session.write().unwrap(); - if session.poll(POLL_SLEEP_INTERVAL).is_err() { + if rx.try_recv().is_ok() { + info!("Run session was terminated by a message"); + break; + } + if session.poll(sleep_interval).is_err() { // Break the loop if connection goes down info!("Connection to server broke, so terminating"); break; @@ -944,7 +965,17 @@ impl Session { } } - /// Sends a ReadRequest to the server + /// Reads the value of nodes by sending a [`ReadRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_read` - A list of `ReadValueId` to be read by the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of datavalues corresponding to each read operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// pub fn read(&mut self, nodes_to_read: &[ReadValueId]) -> Result>, StatusCode> { if nodes_to_read.is_empty() { // No subscriptions @@ -970,7 +1001,17 @@ impl Session { } } - /// Sends a WriteRequest to the server + /// Writes values to nodes by sending a [`WriteRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_write` - A list of `WriteValue` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of results corresponding to each write operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// pub fn write_value(&mut self, nodes_to_write: &[WriteValue]) -> Result>, StatusCode> { if nodes_to_write.is_empty() { // No subscriptions @@ -993,6 +1034,122 @@ impl Session { } } + /// Add nodes by sending a [`AddNodesRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_add` - A list of `AddNodesItem` to be added to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each add node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn add_nodes(&mut self, nodes_to_add: &[AddNodesItem]) -> Result, StatusCode> { + if nodes_to_add.is_empty() { + error!("add_nodes, called with no nodes to add"); + Err(StatusCode::BadNothingToDo) + } else { + let request = AddNodesRequest { + request_header: self.make_request_header(), + nodes_to_add: Some(nodes_to_add.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::AddNodesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Add references by sending a [`AddReferencesRequest`] to the server. + /// + /// # Arguments + /// + /// * `references_to_add` - A list of `AddReferencesItem` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each add reference operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn add_references(&mut self, references_to_add: &[AddReferencesItem]) -> Result, StatusCode> { + if references_to_add.is_empty() { + error!("add_references, called with no references to add"); + Err(StatusCode::BadNothingToDo) + } else { + let request = AddReferencesRequest { + request_header: self.make_request_header(), + references_to_add: Some(references_to_add.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::AddReferencesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Delete nodes by sending a [`DeleteNodesRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_delete` - A list of `DeleteNodesItem` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each delete node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn delete_nodes(&mut self, nodes_to_delete: &[DeleteNodesItem]) -> Result, StatusCode> { + if nodes_to_delete.is_empty() { + error!("delete_nodes, called with no nodes to delete"); + Err(StatusCode::BadNothingToDo) + } else { + let request = DeleteNodesRequest { + request_header: self.make_request_header(), + nodes_to_delete: Some(nodes_to_delete.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::DeleteNodesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Delete references by sending a [`DeleteReferencesRequest`] to the server. + /// + /// # Arguments + /// + /// * `nodes_to_delete` - A list of `DeleteReferencesItem` to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - Result for each delete node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + pub fn delete_references(&mut self, references_to_delete: &[DeleteReferencesItem]) -> Result, StatusCode> { + if references_to_delete.is_empty() { + error!("delete_references, called with no references to delete"); + Err(StatusCode::BadNothingToDo) + } else { + let request = DeleteReferencesRequest { + request_header: self.make_request_header(), + references_to_delete: Some(references_to_delete.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::DeleteReferencesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + /// Create a subscription by sending a [`CreateSubscriptionRequest`] to the server. /// /// # Arguments diff --git a/server/src/subscriptions/mod.rs b/server/src/subscriptions/mod.rs index c18d7b195..17e4f9bb2 100644 --- a/server/src/subscriptions/mod.rs +++ b/server/src/subscriptions/mod.rs @@ -1,5 +1,8 @@ -use opcua_types::SupportedMessage; -use opcua_types::service_types::PublishRequest; +use opcua_types::{ + SupportedMessage, + status_code::StatusCode, + service_types::PublishRequest, +}; /// The publish request entry preserves the request_id which is part of the chunk layer but clients /// are fickle about receiving responses from the same as the request. Normally this is easy because @@ -7,8 +10,12 @@ use opcua_types::service_types::PublishRequest; /// so that later we can send out responses that have the proper req id #[derive(Clone)] pub struct PublishRequestEntry { + // The request id pub request_id: u32, + // The request itself pub request: PublishRequest, + // The result of clearing acknowledgments when the request was received. + pub results: Option>, } #[derive(Clone, Debug, PartialEq)] diff --git a/server/src/subscriptions/subscription.rs b/server/src/subscriptions/subscription.rs index c91a45fe7..24c4ce62e 100644 --- a/server/src/subscriptions/subscription.rs +++ b/server/src/subscriptions/subscription.rs @@ -95,10 +95,19 @@ impl UpdateStateResult { #[derive(Debug, Copy, Clone, PartialEq)] pub(crate) enum TickReason { - ReceivedPublishRequest, + ReceivePublishRequest, +// PublishingTimerExpires, TickTimerFired, } +#[derive(Debug, Copy, Clone, PartialEq)] +pub(crate) enum StateEvent +{ + ReceivePublishRequest, + PublishingTimerExpires, +} + + #[derive(Debug, Clone, Serialize)] pub struct Subscription { /// Subscription id @@ -304,7 +313,7 @@ impl Subscription { pub(crate) fn tick(&mut self, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool, now: &DateTimeUtc) { // Check if the publishing interval has elapsed. Only checks on the tick timer. let publishing_interval_elapsed = match tick_reason { - TickReason::ReceivedPublishRequest => false, + TickReason::ReceivePublishRequest => false, TickReason::TickTimerFired => if self.state == SubscriptionState::Creating { true } else if self.publishing_interval <= 0f64 { @@ -335,7 +344,7 @@ impl Subscription { }; self.resend_data = false; - let notifications_available = notification.is_some(); + let notifications_available = !self.notifications.is_empty() || notification.is_some(); // If items have changed or subscription interval elapsed then we may have notifications // to send or state to update @@ -348,47 +357,50 @@ impl Subscription { publishing_timer_expired: publishing_interval_elapsed, }); trace!("subscription tick - update_state_result = {:?}", update_state_result); + self.handle_state_result(update_state_result, notification, now); + } + } - // Now act on the state's action - match update_state_result.update_state_action { - UpdateStateAction::None => { - if let Some(ref notification) = notification { - // Reset the next sequence number to the discarded notification - let notification_sequence_number = notification.sequence_number; - self.sequence_number.set_next(notification_sequence_number); - debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result); - } - // Send nothing - //println!("do nothing {:?}", update_state_result.handled_state); + fn handle_state_result(&mut self, update_state_result: UpdateStateResult, notification: Option, now: &DateTimeUtc) { + // Now act on the state's action + match update_state_result.update_state_action { + UpdateStateAction::None => { + if let Some(ref notification) = notification { + // Reset the next sequence number to the discarded notification + let notification_sequence_number = notification.sequence_number; + self.sequence_number.set_next(notification_sequence_number); + debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result); } - UpdateStateAction::ReturnKeepAlive => { - if let Some(ref notification) = notification { - // Reset the next sequence number to the discarded notification - let notification_sequence_number = notification.sequence_number; - self.sequence_number.set_next(notification_sequence_number); - debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result); - } - // Send a keep alive - debug!("Sending keep alive response"); - self.notifications.push_back(NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()))); - } - UpdateStateAction::ReturnNotifications => { - // Add the notification message to the queue - if let Some(notification) = notification { - self.notifications.push_back(notification); - } - } - UpdateStateAction::SubscriptionCreated => { - // Subscription was created successfully - self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good)) + // Send nothing + //println!("do nothing {:?}", update_state_result.handled_state); + } + UpdateStateAction::ReturnKeepAlive => { + if let Some(ref notification) = notification { + // Reset the next sequence number to the discarded notification + let notification_sequence_number = notification.sequence_number; + self.sequence_number.set_next(notification_sequence_number); + debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result); } - UpdateStateAction::SubscriptionExpired => { - // Delete the monitored items, issue a status change for the subscription - debug!("Subscription status change to closed / timeout"); - self.monitored_items.clear(); - self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout)) + // Send a keep alive + debug!("Sending keep alive response"); + self.notifications.push_back(NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()))); + } + UpdateStateAction::ReturnNotifications => { + // Add the notification message to the queue + if let Some(notification) = notification { + self.notifications.push_back(notification); } } + UpdateStateAction::SubscriptionCreated => { + // Subscription was created successfully + self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good)) + } + UpdateStateAction::SubscriptionExpired => { + // Delete the monitored items, issue a status change for the subscription + debug!("Subscription status change to closed / timeout"); + self.monitored_items.clear(); + self.notifications.push_back(NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout)) + } } } @@ -421,7 +433,7 @@ impl Subscription { pub(crate) fn update_state(&mut self, tick_reason: TickReason, p: SubscriptionStateParams) -> UpdateStateResult { // This function is called when a publish request is received OR the timer expired, so getting // both is invalid code somewhere - if tick_reason == TickReason::ReceivedPublishRequest && p.publishing_timer_expired { + if tick_reason == TickReason::ReceivePublishRequest && p.publishing_timer_expired { panic!("Should not be possible for timer to have expired and received publish request at same time") } @@ -477,10 +489,10 @@ impl Subscription { return UpdateStateResult::new(HandledState::Create3, UpdateStateAction::SubscriptionCreated); } SubscriptionState::Normal => { - if tick_reason == TickReason::ReceivedPublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.more_notifications)) { + if tick_reason == TickReason::ReceivePublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.more_notifications)) { // State #4 return UpdateStateResult::new(HandledState::Normal4, UpdateStateAction::None); - } else if tick_reason == TickReason::ReceivedPublishRequest && self.publishing_enabled && p.more_notifications { + } else if tick_reason == TickReason::ReceivePublishRequest && self.publishing_enabled && p.more_notifications { // State #5 self.reset_lifetime_counter(); self.message_sent = true; @@ -511,13 +523,13 @@ impl Subscription { } } SubscriptionState::Late => { - if tick_reason == TickReason::ReceivedPublishRequest && self.publishing_enabled && (p.notifications_available || p.more_notifications) { + if tick_reason == TickReason::ReceivePublishRequest && self.publishing_enabled && (p.notifications_available || p.more_notifications) { // State #10 self.reset_lifetime_counter(); self.state = SubscriptionState::Normal; self.message_sent = true; return UpdateStateResult::new(HandledState::Late10, UpdateStateAction::ReturnNotifications); - } else if tick_reason == TickReason::ReceivedPublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available && !p.more_notifications)) { + } else if tick_reason == TickReason::ReceivePublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available && !p.more_notifications)) { // State #11 self.reset_lifetime_counter(); self.state = SubscriptionState::KeepAlive; @@ -530,7 +542,7 @@ impl Subscription { } } SubscriptionState::KeepAlive => { - if tick_reason == TickReason::ReceivedPublishRequest { + if tick_reason == TickReason::ReceivePublishRequest { // State #13 return UpdateStateResult::new(HandledState::KeepAlive13, UpdateStateAction::None); } else if p.publishing_timer_expired && self.publishing_enabled && p.notifications_available && p.publishing_req_queued { diff --git a/server/src/subscriptions/subscriptions.rs b/server/src/subscriptions/subscriptions.rs index 810bcd688..2ea05b1a8 100644 --- a/server/src/subscriptions/subscriptions.rs +++ b/server/src/subscriptions/subscriptions.rs @@ -106,13 +106,15 @@ impl Subscriptions { } Err(StatusCode::BadTooManyPublishRequests) } else { - - + // Clear all acknowledged items here + // Acknowledge results + let results = self.process_subscription_acknowledgements(&request); // Add to the front of the queue - older items are popped from the back self.publish_request_queue.push_front(PublishRequestEntry { request_id, request, + results }); Ok(()) } @@ -217,9 +219,7 @@ impl Subscriptions { self.retransmission_queue.insert((subscription_id, notification_message.sequence_number), notification_message.clone()); // Acknowledge results - let results = self.process_subscription_acknowledgements(&publish_request.request); - - let response = self.make_publish_response(&publish_request, subscription_id, now, notification_message, more_notifications, available_sequence_numbers, results); + let response = self.make_publish_response(publish_request, subscription_id, now, notification_message, more_notifications, available_sequence_numbers); self.publish_response_queue.push_back(response); } @@ -329,7 +329,7 @@ impl Subscriptions { } } - fn make_publish_response(&self, publish_request: &PublishRequestEntry, subscription_id: u32, now: &DateTimeUtc, notification_message: NotificationMessage, more_notifications: bool, available_sequence_numbers: Option>, results: Option>) -> PublishResponseEntry { + fn make_publish_response(&self, publish_request: PublishRequestEntry, subscription_id: u32, now: &DateTimeUtc, notification_message: NotificationMessage, more_notifications: bool, available_sequence_numbers: Option>) -> PublishResponseEntry { let now = DateTime::from(now.clone()); PublishResponseEntry { request_id: publish_request.request_id, @@ -339,7 +339,7 @@ impl Subscriptions { available_sequence_numbers, more_notifications, notification_message, - results, + results: publish_request.results, diagnostic_infos: None, }.into(), } diff --git a/server/src/tests/mod.rs b/server/src/tests/mod.rs index f91dcecb6..00d44789e 100644 --- a/server/src/tests/mod.rs +++ b/server/src/tests/mod.rs @@ -110,6 +110,7 @@ pub fn expired_publish_requests() { request_header: RequestHeader::new(&NodeId::null(), &now, 1000), subscription_acknowledgements: None, }, + results: None, }; pr1.request.request_header.timeout_hint = 5001; @@ -119,6 +120,7 @@ pub fn expired_publish_requests() { request_header: RequestHeader::new(&NodeId::null(), &now, 2000), subscription_acknowledgements: None, }, + results: None, }; pr2.request.request_header.timeout_hint = 3000; @@ -160,6 +162,4 @@ pub fn expired_publish_requests() { panic!("Expected service faults for timed out publish requests") } } - - } \ No newline at end of file diff --git a/server/src/tests/subscriptions/subscription.rs b/server/src/tests/subscriptions/subscription.rs index 6888c5347..d96250f84 100644 --- a/server/src/tests/subscriptions/subscription.rs +++ b/server/src/tests/subscriptions/subscription.rs @@ -61,7 +61,7 @@ fn update_state_4() { // (PublishingEnabled == TRUE // && MoreNotifications == FALSE) // ) - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: false, @@ -94,7 +94,7 @@ fn update_state_5() { // set publish enabled true // set more notifications true - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: true, @@ -234,7 +234,7 @@ fn update_state_9() { fn update_state_10() { let mut s = make_subscription(SubscriptionState::Late); - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: false, @@ -256,7 +256,7 @@ fn update_state_10() { fn update_state_11() { let mut s = make_subscription(SubscriptionState::Late); - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: false, more_notifications: false, @@ -299,7 +299,7 @@ fn update_state_12() { fn update_state_13() { let mut s = make_subscription(SubscriptionState::KeepAlive); - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: false, more_notifications: false, @@ -416,7 +416,7 @@ fn update_state_27() { // set publish enabled true // set more notifications true - let tick_reason = TickReason::ReceivedPublishRequest; + let tick_reason = TickReason::ReceivePublishRequest; let p = SubscriptionStateParams { notifications_available: true, more_notifications: true,