Skip to content

Commit

Permalink
Adjust DataChangeCallback in preparation to allow it to process events
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Jun 7, 2019
1 parent f60ccd8 commit 8be6566
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
25 changes: 21 additions & 4 deletions client/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,26 @@ use opcua_types::status_code::StatusCode;

use crate::subscription::MonitoredItem;

pub enum SubscriptionNotification<'a> {
DataChange(Vec<&'a MonitoredItem>),
Event,
}

/// This trait is implemented by something that wishes to receive subscription data change notifications.
pub trait OnDataChange {
fn data_change(&mut self, data_change_items: Vec<&MonitoredItem>);
pub trait OnSubscriptionNotification {
fn notification(&mut self, notification: SubscriptionNotification) {
match notification {
SubscriptionNotification::DataChange(data_change_items) => {
self.data_change(data_change_items);
}
Event => {
self.event();
}
}
}

fn data_change(&mut self, data_change_items: Vec<&MonitoredItem>) {}
fn event(&mut self /* TODO */) {}
}

/// This trait is implemented by something that wishes to receive connection status change notifications.
Expand All @@ -32,13 +49,13 @@ pub trait OnSessionClosed {
fn session_closed(&mut self, status_code: StatusCode);
}

/// This is a concrete implementation of [`OnDataChange`] that calls a function.
/// This is a concrete implementation of [`OnSubscriptionNotification`] that calls a function.
pub struct DataChangeCallback {
/// The actual call back
cb: Box<dyn Fn(Vec<&MonitoredItem>) + Send + Sync + 'static>
}

impl OnDataChange for DataChangeCallback {
impl OnSubscriptionNotification for DataChangeCallback {
fn data_change(&mut self, data_change_items: Vec<&MonitoredItem>) {
(self.cb)(data_change_items);
}
Expand Down
23 changes: 12 additions & 11 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
//! The session also has async functionality but that is reserved for publish requests on subscriptions
//! and events.
use std::{
cmp, thread,
convert::TryFrom,
result::Result,
collections::HashSet,
str::FromStr,
cmp, thread, convert::TryFrom, result::Result, collections::HashSet, str::FromStr,
sync::{Arc, Mutex, RwLock, mpsc},
time::{Instant, Duration},
};
use futures::{
future,
Future,
future, Future,
sync::mpsc::UnboundedSender,
stream::Stream,
};
Expand All @@ -34,7 +29,7 @@ use opcua_types::{
};

use crate::{
callbacks::{OnDataChange, OnConnectionStatusChange, OnSessionClosed},
callbacks::{OnSubscriptionNotification, OnConnectionStatusChange, OnSessionClosed},
client,
comms::tcp_transport::TcpTransport,
message_queue::MessageQueue,
Expand Down Expand Up @@ -1861,14 +1856,14 @@ impl Session {
///
pub fn create_subscription<CB>(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, priority: u8, publishing_enabled: bool, callback: CB)
-> Result<u32, StatusCode>
where CB: OnDataChange + Send + Sync + 'static {
where CB: OnSubscriptionNotification + Send + Sync + 'static {
self.create_subscription_inner(publishing_interval, lifetime_count, max_keep_alive_count, max_notifications_per_publish, priority, publishing_enabled, Arc::new(Mutex::new(callback)))
}

/// This is the internal handler for create subscription that receives the callback wrapped up and reference counted.
fn create_subscription_inner(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32,
priority: u8, publishing_enabled: bool,
callback: Arc<Mutex<dyn OnDataChange + Send + Sync + 'static>>)
callback: Arc<Mutex<dyn OnSubscriptionNotification + Send + Sync + 'static>>)
-> Result<u32, StatusCode>
{
let request = CreateSubscriptionRequest {
Expand Down Expand Up @@ -2199,6 +2194,7 @@ impl Session {

let endpoint = &self.session_info.endpoint;
let policy = endpoint.find_policy(user_token_type);
debug!("Endpoint policy = {:?}", policy);

// Return the result
match policy {
Expand All @@ -2207,7 +2203,12 @@ impl Session {
Err(StatusCode::BadSecurityPolicyRejected)
}
Some(policy) => {
let security_policy = SecurityPolicy::from_uri(policy.security_policy_uri.as_ref());
let security_policy = if policy.security_policy_uri.is_null() {
// Assume None
SecurityPolicy::None
} else {
SecurityPolicy::from_uri(policy.security_policy_uri.as_ref())
};
if security_policy == SecurityPolicy::Unknown {
error!("Can't support the security policy {}", policy.security_policy_uri);
Err(StatusCode::BadSecurityPolicyRejected)
Expand Down
8 changes: 4 additions & 4 deletions client/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::marker::Sync;
use opcua_types::*;
use opcua_types::service_types::{DataChangeNotification, ReadValueId};

use crate::callbacks::OnDataChange;
use crate::callbacks::OnSubscriptionNotification;

pub(crate) struct CreateMonitoredItem {
pub id: u32,
Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct Subscription {
priority: u8,
/// The change callback will be what is called if any monitored item changes within a cycle.
/// The monitored item is referenced by its id
data_change_callback: Arc<Mutex<OnDataChange + Send + Sync>>,
data_change_callback: Arc<Mutex<OnSubscriptionNotification + Send + Sync>>,
/// A map of monitored items associated with the subscription (key = monitored_item_id)
monitored_items: HashMap<u32, MonitoredItem>,
/// A map of client handle to monitored item id
Expand All @@ -156,7 +156,7 @@ pub struct Subscription {
impl Subscription {
/// Creates a new subscription using the supplied parameters and the supplied data change callback.
pub fn new(subscription_id: u32, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32,
publishing_enabled: bool, priority: u8, data_change_callback: Arc<Mutex<dyn OnDataChange + Send + Sync>>)
publishing_enabled: bool, priority: u8, data_change_callback: Arc<Mutex<dyn OnSubscriptionNotification + Send + Sync>>)
-> Subscription
{
Subscription {
Expand Down Expand Up @@ -189,7 +189,7 @@ impl Subscription {

pub fn priority(&self) -> u8 { self.priority }

pub fn data_change_callback(&self) -> Arc<Mutex<dyn OnDataChange + Send + Sync>> { self.data_change_callback.clone() }
pub fn data_change_callback(&self) -> Arc<Mutex<dyn OnSubscriptionNotification + Send + Sync>> { self.data_change_callback.clone() }

pub(crate) fn set_publishing_interval(&mut self, publishing_interval: f64) { self.publishing_interval = publishing_interval; }

Expand Down
1 change: 1 addition & 0 deletions samples/simple-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn main() {
.application_name("Simple Client")
.application_uri("urn:SimpleClient")
.trust_server_certs(true)
.session_retry_limit(0)
.client().unwrap();

if let Ok(session) = client.connect_to_endpoint((url.as_ref(), SecurityPolicy::None.to_str(), MessageSecurityMode::None, UserTokenPolicy::anonymous()), IdentityToken::Anonymous) {
Expand Down

0 comments on commit 8be6566

Please sign in to comment.