From 8be65662d0c5da2875eab3fd58d5d3b103304525 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Fri, 7 Jun 2019 20:33:28 +0100 Subject: [PATCH] Adjust DataChangeCallback in preparation to allow it to process events --- client/src/callbacks.rs | 25 +++++++++++++++++++++---- client/src/session.rs | 23 ++++++++++++----------- client/src/subscription.rs | 8 ++++---- samples/simple-client/src/main.rs | 1 + 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/client/src/callbacks.rs b/client/src/callbacks.rs index 1d37d7348..53283ba5d 100644 --- a/client/src/callbacks.rs +++ b/client/src/callbacks.rs @@ -10,9 +10,26 @@ use opcua_types::status_code::StatusCode; use crate::subscription::MonitoredItem; +pub enum SubscriptionNotification<'a> { + DataChange(Vec<&'a MonitoredItem>), + Event, +} + /// This trait is implemented by something that wishes to receive subscription data change notifications. -pub trait OnDataChange { - fn data_change(&mut self, data_change_items: Vec<&MonitoredItem>); +pub trait OnSubscriptionNotification { + fn notification(&mut self, notification: SubscriptionNotification) { + match notification { + SubscriptionNotification::DataChange(data_change_items) => { + self.data_change(data_change_items); + } + Event => { + self.event(); + } + } + } + + fn data_change(&mut self, data_change_items: Vec<&MonitoredItem>) {} + fn event(&mut self /* TODO */) {} } /// This trait is implemented by something that wishes to receive connection status change notifications. @@ -32,13 +49,13 @@ pub trait OnSessionClosed { fn session_closed(&mut self, status_code: StatusCode); } -/// This is a concrete implementation of [`OnDataChange`] that calls a function. +/// This is a concrete implementation of [`OnSubscriptionNotification`] that calls a function. pub struct DataChangeCallback { /// The actual call back cb: Box) + Send + Sync + 'static> } -impl OnDataChange for DataChangeCallback { +impl OnSubscriptionNotification for DataChangeCallback { fn data_change(&mut self, data_change_items: Vec<&MonitoredItem>) { (self.cb)(data_change_items); } diff --git a/client/src/session.rs b/client/src/session.rs index 0955c9f18..4dc435b08 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -4,17 +4,12 @@ //! The session also has async functionality but that is reserved for publish requests on subscriptions //! and events. use std::{ - cmp, thread, - convert::TryFrom, - result::Result, - collections::HashSet, - str::FromStr, + cmp, thread, convert::TryFrom, result::Result, collections::HashSet, str::FromStr, sync::{Arc, Mutex, RwLock, mpsc}, time::{Instant, Duration}, }; use futures::{ - future, - Future, + future, Future, sync::mpsc::UnboundedSender, stream::Stream, }; @@ -34,7 +29,7 @@ use opcua_types::{ }; use crate::{ - callbacks::{OnDataChange, OnConnectionStatusChange, OnSessionClosed}, + callbacks::{OnSubscriptionNotification, OnConnectionStatusChange, OnSessionClosed}, client, comms::tcp_transport::TcpTransport, message_queue::MessageQueue, @@ -1861,14 +1856,14 @@ impl Session { /// pub fn create_subscription(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, priority: u8, publishing_enabled: bool, callback: CB) -> Result - where CB: OnDataChange + Send + Sync + 'static { + where CB: OnSubscriptionNotification + Send + Sync + 'static { self.create_subscription_inner(publishing_interval, lifetime_count, max_keep_alive_count, max_notifications_per_publish, priority, publishing_enabled, Arc::new(Mutex::new(callback))) } /// This is the internal handler for create subscription that receives the callback wrapped up and reference counted. fn create_subscription_inner(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, priority: u8, publishing_enabled: bool, - callback: Arc>) + callback: Arc>) -> Result { let request = CreateSubscriptionRequest { @@ -2199,6 +2194,7 @@ impl Session { let endpoint = &self.session_info.endpoint; let policy = endpoint.find_policy(user_token_type); + debug!("Endpoint policy = {:?}", policy); // Return the result match policy { @@ -2207,7 +2203,12 @@ impl Session { Err(StatusCode::BadSecurityPolicyRejected) } Some(policy) => { - let security_policy = SecurityPolicy::from_uri(policy.security_policy_uri.as_ref()); + let security_policy = if policy.security_policy_uri.is_null() { + // Assume None + SecurityPolicy::None + } else { + SecurityPolicy::from_uri(policy.security_policy_uri.as_ref()) + }; if security_policy == SecurityPolicy::Unknown { error!("Can't support the security policy {}", policy.security_policy_uri); Err(StatusCode::BadSecurityPolicyRejected) diff --git a/client/src/subscription.rs b/client/src/subscription.rs index 0595d27a7..00533fdcc 100644 --- a/client/src/subscription.rs +++ b/client/src/subscription.rs @@ -15,7 +15,7 @@ use std::marker::Sync; use opcua_types::*; use opcua_types::service_types::{DataChangeNotification, ReadValueId}; -use crate::callbacks::OnDataChange; +use crate::callbacks::OnSubscriptionNotification; pub(crate) struct CreateMonitoredItem { pub id: u32, @@ -146,7 +146,7 @@ pub struct Subscription { priority: u8, /// The change callback will be what is called if any monitored item changes within a cycle. /// The monitored item is referenced by its id - data_change_callback: Arc>, + data_change_callback: Arc>, /// A map of monitored items associated with the subscription (key = monitored_item_id) monitored_items: HashMap, /// A map of client handle to monitored item id @@ -156,7 +156,7 @@ pub struct Subscription { impl Subscription { /// Creates a new subscription using the supplied parameters and the supplied data change callback. pub fn new(subscription_id: u32, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, - publishing_enabled: bool, priority: u8, data_change_callback: Arc>) + publishing_enabled: bool, priority: u8, data_change_callback: Arc>) -> Subscription { Subscription { @@ -189,7 +189,7 @@ impl Subscription { pub fn priority(&self) -> u8 { self.priority } - pub fn data_change_callback(&self) -> Arc> { self.data_change_callback.clone() } + pub fn data_change_callback(&self) -> Arc> { self.data_change_callback.clone() } pub(crate) fn set_publishing_interval(&mut self, publishing_interval: f64) { self.publishing_interval = publishing_interval; } diff --git a/samples/simple-client/src/main.rs b/samples/simple-client/src/main.rs index e580a6a60..549184787 100644 --- a/samples/simple-client/src/main.rs +++ b/samples/simple-client/src/main.rs @@ -29,6 +29,7 @@ fn main() { .application_name("Simple Client") .application_uri("urn:SimpleClient") .trust_server_certs(true) + .session_retry_limit(0) .client().unwrap(); if let Ok(session) = client.connect_to_endpoint((url.as_ref(), SecurityPolicy::None.to_str(), MessageSecurityMode::None, UserTokenPolicy::anonymous()), IdentityToken::Anonymous) {