Skip to content

Commit

Permalink
locka99#95 on_data_change handler was throwing away all but last valu…
Browse files Browse the repository at this point in the history
…e so client couldn't see other values on a monitored item with a small monitoring rate with a long publish interval
  • Loading branch information
locka99 committed Mar 8, 2021
1 parent d5ffcdd commit 65386a8
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 33 deletions.
8 changes: 4 additions & 4 deletions client/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::subscription::MonitoredItem;
pub trait OnSubscriptionNotification {
/// Called by the subscription after a `DataChangeNotification`. The default implementation
/// does nothing.
fn on_data_change(&mut self, _data_change_items: Vec<&MonitoredItem>) {}
fn on_data_change(&mut self, _data_change_items: &Vec<&MonitoredItem>) {}

/// Called by the subscription after a `EventNotificationList`. The notifications contained within
/// are individual `EventFieldList` structs filled from the select clause criteria from when the
Expand Down Expand Up @@ -64,11 +64,11 @@ pub trait OnSessionClosed {
/// a data change occurs.
pub struct DataChangeCallback {
/// The actual call back
cb: Box<dyn Fn(Vec<&MonitoredItem>) + Send + Sync + 'static>,
cb: Box<dyn Fn(&Vec<&MonitoredItem>) + Send + Sync + 'static>,
}

impl OnSubscriptionNotification for DataChangeCallback {
fn on_data_change(&mut self, data_change_items: Vec<&MonitoredItem>) {
fn on_data_change(&mut self, data_change_items: &Vec<&MonitoredItem>) {
(self.cb)(data_change_items);
}
}
Expand All @@ -77,7 +77,7 @@ impl DataChangeCallback {
/// Constructs a callback from the supplied function
pub fn new<CB>(cb: CB) -> Self
where
CB: Fn(Vec<&MonitoredItem>) + Send + Sync + 'static,
CB: Fn(&Vec<&MonitoredItem>) + Send + Sync + 'static,
{
Self { cb: Box::new(cb) }
}
Expand Down
2 changes: 1 addition & 1 deletion client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
//!
//! fn print_value(item: &MonitoredItem) {
//! let node_id = &item.item_to_monitor().node_id;
//! let data_value = item.value();
//! let data_value = item.last_value();
//! if let Some(ref value) = data_value.value {
//! println!("Item \"{}\", Value = {:?}", node_id, value);
//! } else {
Expand Down
2 changes: 1 addition & 1 deletion client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl Session {
client_handle: item.client_handle(),
sampling_interval: item.sampling_interval(),
filter: ExtensionObject::null(),
queue_size: item.queue_size(),
queue_size: item.queue_size() as u32,
discard_oldest: true,
},
})
Expand Down
80 changes: 56 additions & 24 deletions client/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ pub struct MonitoredItem {
// The thing that is actually being monitored - the node id, attribute, index, encoding.
item_to_monitor: ReadValueId,
/// Queue size
queue_size: u32,
queue_size: usize,
/// Discard oldest
discard_oldest: bool,
/// Monitoring mode
monitoring_mode: MonitoringMode,
/// Sampling interval
sampling_interval: f64,
/// Last value of the item
value: DataValue,
last_value: DataValue,
/// A list of all values received in the last data change notification. This list is cleared immediately
/// after the data change notification.
values: Vec<DataValue>,
/// Triggered items
triggered_items: BTreeSet<u32>,
}
Expand All @@ -67,7 +70,7 @@ impl MonitoredItem {
pub fn new(client_handle: u32) -> MonitoredItem {
MonitoredItem {
id: 0,
queue_size: 0,
queue_size: 1,
sampling_interval: 0.0,
item_to_monitor: ReadValueId {
node_id: NodeId::null(),
Expand All @@ -77,7 +80,8 @@ impl MonitoredItem {
},
monitoring_mode: MonitoringMode::Reporting,
discard_oldest: false,
value: DataValue::null(),
last_value: DataValue::null(),
values: Vec::with_capacity(1),
client_handle,
triggered_items: BTreeSet::new(),
}
Expand All @@ -99,12 +103,27 @@ impl MonitoredItem {
self.sampling_interval
}

pub fn queue_size(&self) -> u32 {
pub fn queue_size(&self) -> usize {
self.queue_size
}

pub fn value(&self) -> &DataValue {
&self.value
pub fn last_value(&self) -> &DataValue {
&self.last_value
}

pub fn values(&self) -> &Vec<DataValue> {
&self.values
}

pub fn clear_values(&mut self) {
self.values.clear();
}

pub fn append_new_value(&mut self, value: DataValue) {
if self.values.len() == self.queue_size {
let _ = self.values.pop();
self.values.push(value);
}
}

pub fn monitoring_mode(&self) -> MonitoringMode {
Expand All @@ -127,8 +146,12 @@ impl MonitoredItem {
self.sampling_interval = value;
}

pub(crate) fn set_queue_size(&mut self, value: u32) {
pub(crate) fn set_queue_size(&mut self, value: usize) {
self.queue_size = value;
if self.queue_size > self.values.capacity() {
self.values
.reserve(self.queue_size - self.values.capacity());
}
}

pub(crate) fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) {
Expand Down Expand Up @@ -272,7 +295,7 @@ impl Subscription {
monitored_item.set_monitoring_mode(i.monitoring_mode);
monitored_item.set_discard_oldest(i.discard_oldest);
monitored_item.set_sampling_interval(i.sampling_interval);
monitored_item.set_queue_size(i.queue_size);
monitored_item.set_queue_size(i.queue_size as usize);
monitored_item.set_item_to_monitor(i.item_to_monitor.clone());

let client_handle = monitored_item.client_handle();
Expand All @@ -287,7 +310,7 @@ impl Subscription {
items_to_modify.iter().for_each(|i| {
if let Some(ref mut monitored_item) = self.monitored_items.get_mut(&i.id) {
monitored_item.set_sampling_interval(i.sampling_interval);
monitored_item.set_queue_size(i.queue_size);
monitored_item.set_queue_size(i.queue_size as usize);
}
});
}
Expand Down Expand Up @@ -327,8 +350,9 @@ impl Subscription {

pub(crate) fn on_data_change(&mut self, data_change_notifications: &[DataChangeNotification]) {
let mut monitored_item_ids = HashSet::new();
for n in data_change_notifications {
data_change_notifications.iter().for_each(|n| {
if let Some(ref monitored_items) = n.monitored_items {
monitored_item_ids.clear();
for i in monitored_items {
let monitored_item_id = {
let monitored_item_id = self.monitored_item_id_from_handle(i.client_handle);
Expand All @@ -338,21 +362,29 @@ impl Subscription {
*monitored_item_id.as_ref().unwrap()
};
let monitored_item = self.monitored_items.get_mut(&monitored_item_id).unwrap();
monitored_item.value = i.value.clone();
monitored_item.last_value = i.value.clone();
monitored_item.values.push(i.value.clone());
monitored_item_ids.insert(monitored_item_id);
}
if !monitored_item_ids.is_empty() {
let data_change_items: Vec<&MonitoredItem> = monitored_item_ids
.iter()
.map(|id| self.monitored_items.get(&id).unwrap())
.collect();

{
// Call the call back with the changes we collected
let mut cb = trace_lock_unwrap!(self.notification_callback);
cb.on_data_change(&data_change_items);
}

// Clear the values
monitored_item_ids.iter().for_each(|id| {
let m = self.monitored_items.get_mut(&id).unwrap();
m.clear_values();
});
}
}
}

if !monitored_item_ids.is_empty() {
let data_change_items: Vec<&MonitoredItem> = monitored_item_ids
.iter()
.map(|id| self.monitored_items.get(&id).unwrap())
.collect();

// Call the call back with the changes we collected
let mut cb = trace_lock_unwrap!(self.notification_callback);
cb.on_data_change(data_change_items);
}
});
}
}
2 changes: 1 addition & 1 deletion samples/mqtt-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn subscription_loop(
let tx = tx.lock().unwrap();
items.iter().for_each(|item| {
let node_id = item.item_to_monitor().node_id.clone();
let value = item.value().clone();
let value = item.last_value().clone();
let _ = tx.send((node_id, value));
});
}),
Expand Down
2 changes: 1 addition & 1 deletion samples/simple-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn subscribe_to_variables(session: Arc<RwLock<Session>>, ns: u16) -> Result<(),

fn print_value(item: &MonitoredItem) {
let node_id = &item.item_to_monitor().node_id;
let data_value = item.value();
let data_value = item.last_value();
if let Some(ref value) = data_value.value {
println!("Item \"{}\", Value = {:?}", node_id, value);
} else {
Expand Down
3 changes: 2 additions & 1 deletion samples/web-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// OPCUA for Rust
// OPCUA for Rust
// SPDX-License-Identifier: MPL-2.0
// Copyright (C) 2017-2020 Adam Lock

Expand Down Expand Up @@ -415,7 +416,7 @@ impl OPCUASession {
DataChangeEvent {
node_id: item_to_monitor.node_id.clone().into(),
attribute_id: item_to_monitor.attribute_id,
value: item.value().clone(),
value: item.last_value().clone(),
}
})
.collect::<Vec<_>>();
Expand Down

0 comments on commit 65386a8

Please sign in to comment.