diff --git a/client/src/callbacks.rs b/client/src/callbacks.rs index c846d88ae..a75791dad 100644 --- a/client/src/callbacks.rs +++ b/client/src/callbacks.rs @@ -32,7 +32,7 @@ use crate::subscription::MonitoredItem; pub trait OnSubscriptionNotification { /// Called by the subscription after a `DataChangeNotification`. The default implementation /// does nothing. - fn on_data_change(&mut self, _data_change_items: Vec<&MonitoredItem>) {} + fn on_data_change(&mut self, _data_change_items: &Vec<&MonitoredItem>) {} /// Called by the subscription after a `EventNotificationList`. The notifications contained within /// are individual `EventFieldList` structs filled from the select clause criteria from when the @@ -64,11 +64,11 @@ pub trait OnSessionClosed { /// a data change occurs. pub struct DataChangeCallback { /// The actual call back - cb: Box) + Send + Sync + 'static>, + cb: Box) + Send + Sync + 'static>, } impl OnSubscriptionNotification for DataChangeCallback { - fn on_data_change(&mut self, data_change_items: Vec<&MonitoredItem>) { + fn on_data_change(&mut self, data_change_items: &Vec<&MonitoredItem>) { (self.cb)(data_change_items); } } @@ -77,7 +77,7 @@ impl DataChangeCallback { /// Constructs a callback from the supplied function pub fn new(cb: CB) -> Self where - CB: Fn(Vec<&MonitoredItem>) + Send + Sync + 'static, + CB: Fn(&Vec<&MonitoredItem>) + Send + Sync + 'static, { Self { cb: Box::new(cb) } } diff --git a/client/src/lib.rs b/client/src/lib.rs index 93ff6e43a..9481d91ab 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -79,7 +79,7 @@ //! //! fn print_value(item: &MonitoredItem) { //! let node_id = &item.item_to_monitor().node_id; -//! let data_value = item.value(); +//! let data_value = item.last_value(); //! if let Some(ref value) = data_value.value { //! println!("Item \"{}\", Value = {:?}", node_id, value); //! } else { diff --git a/client/src/session.rs b/client/src/session.rs index 235516b0d..3cfa80b5a 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -431,7 +431,7 @@ impl Session { client_handle: item.client_handle(), sampling_interval: item.sampling_interval(), filter: ExtensionObject::null(), - queue_size: item.queue_size(), + queue_size: item.queue_size() as u32, discard_oldest: true, }, }) diff --git a/client/src/subscription.rs b/client/src/subscription.rs index a9fcd0fce..368936b8c 100644 --- a/client/src/subscription.rs +++ b/client/src/subscription.rs @@ -50,7 +50,7 @@ pub struct MonitoredItem { // The thing that is actually being monitored - the node id, attribute, index, encoding. item_to_monitor: ReadValueId, /// Queue size - queue_size: u32, + queue_size: usize, /// Discard oldest discard_oldest: bool, /// Monitoring mode @@ -58,7 +58,10 @@ pub struct MonitoredItem { /// Sampling interval sampling_interval: f64, /// Last value of the item - value: DataValue, + last_value: DataValue, + /// A list of all values received in the last data change notification. This list is cleared immediately + /// after the data change notification. + values: Vec, /// Triggered items triggered_items: BTreeSet, } @@ -67,7 +70,7 @@ impl MonitoredItem { pub fn new(client_handle: u32) -> MonitoredItem { MonitoredItem { id: 0, - queue_size: 0, + queue_size: 1, sampling_interval: 0.0, item_to_monitor: ReadValueId { node_id: NodeId::null(), @@ -77,7 +80,8 @@ impl MonitoredItem { }, monitoring_mode: MonitoringMode::Reporting, discard_oldest: false, - value: DataValue::null(), + last_value: DataValue::null(), + values: Vec::with_capacity(1), client_handle, triggered_items: BTreeSet::new(), } @@ -99,12 +103,27 @@ impl MonitoredItem { self.sampling_interval } - pub fn queue_size(&self) -> u32 { + pub fn queue_size(&self) -> usize { self.queue_size } - pub fn value(&self) -> &DataValue { - &self.value + pub fn last_value(&self) -> &DataValue { + &self.last_value + } + + pub fn values(&self) -> &Vec { + &self.values + } + + pub fn clear_values(&mut self) { + self.values.clear(); + } + + pub fn append_new_value(&mut self, value: DataValue) { + if self.values.len() == self.queue_size { + let _ = self.values.pop(); + self.values.push(value); + } } pub fn monitoring_mode(&self) -> MonitoringMode { @@ -127,8 +146,12 @@ impl MonitoredItem { self.sampling_interval = value; } - pub(crate) fn set_queue_size(&mut self, value: u32) { + pub(crate) fn set_queue_size(&mut self, value: usize) { self.queue_size = value; + if self.queue_size > self.values.capacity() { + self.values + .reserve(self.queue_size - self.values.capacity()); + } } pub(crate) fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) { @@ -272,7 +295,7 @@ impl Subscription { monitored_item.set_monitoring_mode(i.monitoring_mode); monitored_item.set_discard_oldest(i.discard_oldest); monitored_item.set_sampling_interval(i.sampling_interval); - monitored_item.set_queue_size(i.queue_size); + monitored_item.set_queue_size(i.queue_size as usize); monitored_item.set_item_to_monitor(i.item_to_monitor.clone()); let client_handle = monitored_item.client_handle(); @@ -287,7 +310,7 @@ impl Subscription { items_to_modify.iter().for_each(|i| { if let Some(ref mut monitored_item) = self.monitored_items.get_mut(&i.id) { monitored_item.set_sampling_interval(i.sampling_interval); - monitored_item.set_queue_size(i.queue_size); + monitored_item.set_queue_size(i.queue_size as usize); } }); } @@ -327,8 +350,9 @@ impl Subscription { pub(crate) fn on_data_change(&mut self, data_change_notifications: &[DataChangeNotification]) { let mut monitored_item_ids = HashSet::new(); - for n in data_change_notifications { + data_change_notifications.iter().for_each(|n| { if let Some(ref monitored_items) = n.monitored_items { + monitored_item_ids.clear(); for i in monitored_items { let monitored_item_id = { let monitored_item_id = self.monitored_item_id_from_handle(i.client_handle); @@ -338,21 +362,29 @@ impl Subscription { *monitored_item_id.as_ref().unwrap() }; let monitored_item = self.monitored_items.get_mut(&monitored_item_id).unwrap(); - monitored_item.value = i.value.clone(); + monitored_item.last_value = i.value.clone(); + monitored_item.values.push(i.value.clone()); monitored_item_ids.insert(monitored_item_id); } + if !monitored_item_ids.is_empty() { + let data_change_items: Vec<&MonitoredItem> = monitored_item_ids + .iter() + .map(|id| self.monitored_items.get(&id).unwrap()) + .collect(); + + { + // Call the call back with the changes we collected + let mut cb = trace_lock_unwrap!(self.notification_callback); + cb.on_data_change(&data_change_items); + } + + // Clear the values + monitored_item_ids.iter().for_each(|id| { + let m = self.monitored_items.get_mut(&id).unwrap(); + m.clear_values(); + }); + } } - } - - if !monitored_item_ids.is_empty() { - let data_change_items: Vec<&MonitoredItem> = monitored_item_ids - .iter() - .map(|id| self.monitored_items.get(&id).unwrap()) - .collect(); - - // Call the call back with the changes we collected - let mut cb = trace_lock_unwrap!(self.notification_callback); - cb.on_data_change(data_change_items); - } + }); } } diff --git a/samples/mqtt-client/src/main.rs b/samples/mqtt-client/src/main.rs index 1e242eacb..b595dde21 100644 --- a/samples/mqtt-client/src/main.rs +++ b/samples/mqtt-client/src/main.rs @@ -150,7 +150,7 @@ fn subscription_loop( let tx = tx.lock().unwrap(); items.iter().for_each(|item| { let node_id = item.item_to_monitor().node_id.clone(); - let value = item.value().clone(); + let value = item.last_value().clone(); let _ = tx.send((node_id, value)); }); }), diff --git a/samples/simple-client/src/main.rs b/samples/simple-client/src/main.rs index 5834a03a4..b7fd9239e 100644 --- a/samples/simple-client/src/main.rs +++ b/samples/simple-client/src/main.rs @@ -118,7 +118,7 @@ fn subscribe_to_variables(session: Arc>, ns: u16) -> Result<(), fn print_value(item: &MonitoredItem) { let node_id = &item.item_to_monitor().node_id; - let data_value = item.value(); + let data_value = item.last_value(); if let Some(ref value) = data_value.value { println!("Item \"{}\", Value = {:?}", node_id, value); } else { diff --git a/samples/web-client/src/main.rs b/samples/web-client/src/main.rs index f62c1030a..5a67c57e5 100644 --- a/samples/web-client/src/main.rs +++ b/samples/web-client/src/main.rs @@ -1,4 +1,5 @@ // OPCUA for Rust +// OPCUA for Rust // SPDX-License-Identifier: MPL-2.0 // Copyright (C) 2017-2020 Adam Lock @@ -415,7 +416,7 @@ impl OPCUASession { DataChangeEvent { node_id: item_to_monitor.node_id.clone().into(), attribute_id: item_to_monitor.attribute_id, - value: item.value().clone(), + value: item.last_value().clone(), } }) .collect::>();