diff --git a/client/src/session.rs b/client/src/session.rs index 1fe47378d..e22b1d69a 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -328,7 +328,7 @@ impl Session { // For each monitored item let items_to_create = subscription.monitored_items().iter().map(|(_, item)| { MonitoredItemCreateRequest { - item_to_monitor: item.item_to_monitor(), + item_to_monitor: item.item_to_monitor().clone(), monitoring_mode: item.monitoring_mode(), requested_parameters: MonitoringParameters { client_handle: item.client_handle(), diff --git a/client/src/subscription.rs b/client/src/subscription.rs index 88c7f3698..0595d27a7 100644 --- a/client/src/subscription.rs +++ b/client/src/subscription.rs @@ -39,7 +39,7 @@ pub struct MonitoredItem { id: u32, /// Monitored item's handle. Used internally - not modifiable client_handle: u32, - // Item to monitor + // The thing that is actually being monitored - the node id, attribute, index, encoding. item_to_monitor: ReadValueId, /// Queue size queue_size: u32, @@ -81,14 +81,14 @@ impl MonitoredItem { self.client_handle } - pub fn item_to_monitor(&self) -> ReadValueId { self.item_to_monitor.clone() } + pub fn item_to_monitor(&self) -> &ReadValueId { &self.item_to_monitor } pub fn sampling_interval(&self) -> f64 { self.sampling_interval } pub fn queue_size(&self) -> u32 { self.queue_size } - pub fn value(&self) -> DataValue { - self.value.clone() + pub fn value(&self) -> &DataValue { + &self.value } pub fn monitoring_mode(&self) -> MonitoringMode { self.monitoring_mode } @@ -146,7 +146,7 @@ pub struct Subscription { priority: u8, /// The change callback will be what is called if any monitored item changes within a cycle. /// The monitored item is referenced by its id - data_change_callback: Arc>, + data_change_callback: Arc>, /// A map of monitored items associated with the subscription (key = monitored_item_id) monitored_items: HashMap, /// A map of client handle to monitored item id @@ -156,7 +156,7 @@ pub struct Subscription { impl Subscription { /// Creates a new subscription using the supplied parameters and the supplied data change callback. pub fn new(subscription_id: u32, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, - publishing_enabled: bool, priority: u8, data_change_callback: Arc>) + publishing_enabled: bool, priority: u8, data_change_callback: Arc>) -> Subscription { Subscription { @@ -189,7 +189,7 @@ impl Subscription { pub fn priority(&self) -> u8 { self.priority } - pub fn data_change_callback(&self) -> Arc> { self.data_change_callback.clone() } + pub fn data_change_callback(&self) -> Arc> { self.data_change_callback.clone() } pub(crate) fn set_publishing_interval(&mut self, publishing_interval: f64) { self.publishing_interval = publishing_interval; } diff --git a/samples/discovery-client/src/main.rs b/samples/discovery-client/src/main.rs index ce1a7aa68..79acccf16 100644 --- a/samples/discovery-client/src/main.rs +++ b/samples/discovery-client/src/main.rs @@ -2,7 +2,6 @@ use std::str::FromStr; use opcua_client::prelude::*; -use opcua_types::url::is_opc_ua_binary_url; fn main() { // Read the argument @@ -26,41 +25,43 @@ fn main() { // The client API has a simple `find_servers` function that connects and returns servers for us. let mut client = Client::new(ClientConfig::new("DiscoveryClient", "urn:DiscoveryClient")); - let servers = client.find_servers(url); - if let Ok(servers) = servers { - println!("Discovery server responded with {} servers:", servers.len()); - for server in &servers { - // Each server is an `ApplicationDescription` - println!("Server : {}", server.application_name); - if let Some(ref discovery_urls) = server.discovery_urls { - for discovery_url in discovery_urls { - println!(" {}", discovery_url); - if is_opc_ua_binary_url(discovery_url.as_ref()) { - // Try to talk with it and get some endpoints - let client_config = ClientConfig::new("discovery-client", "urn:discovery-client"); - let client = Client::new(client_config); - - // Ask the server associated with the default endpoint for its list of endpoints - match client.get_server_endpoints_from_url(discovery_url.as_ref()) { - Result::Err(status_code) => { - println!(" ERROR: Cannot get endpoints for this server url, error - {}", status_code); - continue; - } - Result::Ok(endpoints) => { - println!(" Server has these endpoints:"); - for e in &endpoints { - println!(" {} - {:?} / {:?}", e.endpoint_url, SecurityPolicy::from_str(e.security_policy_uri.as_ref()).unwrap(), e.security_mode); - } - } - } - } + match client.find_servers(url) { + Ok(servers) => { + println!("Discovery server responded with {} servers:", servers.len()); + servers.iter().for_each(|server| { + // Each server is an `ApplicationDescription` + println!("Server : {}", server.application_name); + if let Some(ref discovery_urls) = server.discovery_urls { + discovery_urls.iter().for_each(|discovery_url| print_server_endpoints(discovery_url.as_ref())); + } else { + println!(" No discovery urls for this server"); } - } else { - println!(" No discovery urls for this server"); - } + }); + } + Err(err) => { + println!("ERROR: Cannot find servers on discovery server - check this error - {:?}", err); } - } else { - println!("ERROR: Cannot find servers on discovery server - check this error - {:?}", servers.unwrap_err()); } } +fn print_server_endpoints(discovery_url: &str) { + println!(" {}", discovery_url); + if is_opc_ua_binary_url(discovery_url) { + // Try to talk with it and get some endpoints + let client_config = ClientConfig::new("discovery-client", "urn:discovery-client"); + let client = Client::new(client_config); + + // Ask the server associated with the default endpoint for its list of endpoints + match client.get_server_endpoints_from_url(discovery_url) { + Result::Ok(endpoints) => { + println!(" Server has these endpoints:"); + endpoints.iter().for_each(|e| { + println!(" {} - {:?} / {:?}", e.endpoint_url, SecurityPolicy::from_str(e.security_policy_uri.as_ref()).unwrap(), e.security_mode); + }); + } + Result::Err(status_code) => { + println!(" ERROR: Cannot get endpoints for this server url, error - {}", status_code); + } + } + } +} \ No newline at end of file diff --git a/samples/mqtt-client/src/main.rs b/samples/mqtt-client/src/main.rs index 3b656f7c1..fdb820b25 100644 --- a/samples/mqtt-client/src/main.rs +++ b/samples/mqtt-client/src/main.rs @@ -104,23 +104,16 @@ fn subscription_loop(session: Arc>, tx: mpsc::Sender<(NodeId, Da println!("Data change from server:"); let tx = tx.lock().unwrap(); items.iter().for_each(|item| { - let node_id = item.item_to_monitor().node_id; - let value = item.value(); + let node_id = item.item_to_monitor().node_id.clone(); + let value = item.value().clone(); let _ = tx.send((node_id, value)); }); }))?; println!("Created a subscription with id = {}", subscription_id); // Create some monitored items - let read_nodes = vec![ - ReadValueId::from(NodeId::new(2, "v1")), - ReadValueId::from(NodeId::new(2, "v2")), - ReadValueId::from(NodeId::new(2, "v3")), - ReadValueId::from(NodeId::new(2, "v4")), - ]; - let items_to_create: Vec = read_nodes.into_iter().map(|read_node| { - MonitoredItemCreateRequest::new(read_node, MonitoringMode::Reporting, MonitoringParameters::default()) - }).collect(); + let items_to_create: Vec = ["v1", "v2", "v3", "v4"].iter() + .map(|v| NodeId::new(2, *v).into()).collect(); let _ = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create)?; } diff --git a/samples/simple-client/src/main.rs b/samples/simple-client/src/main.rs index 82089140d..e580a6a60 100644 --- a/samples/simple-client/src/main.rs +++ b/samples/simple-client/src/main.rs @@ -2,9 +2,7 @@ //! //! 1. Read a configuration file (either default or the one specified using --config) //! 2. Connect & create a session on one of those endpoints that match with its config (you can override which using --endpoint-id arg) -//! 3. Either: -//! a) Read some values and exit -//! b) Subscribe to values and loop forever printing out their values (using --subscribe) +//! 3. Subscribe to values and loop forever printing out their values use std::sync::{Arc, RwLock}; use clap::{App, Arg}; @@ -13,17 +11,15 @@ use opcua_client::prelude::*; fn main() { // Read command line arguments - let url = { - let m = App::new("Simple OPC UA Client") - .arg(Arg::with_name("url") - .long("url") - .help("Specify the OPC UA endpoint to connect to") - .takes_value(true) - .default_value("opc.tcp://localhost:4855") - .required(false)) - .get_matches(); - m.value_of("url").unwrap().to_string() - }; + let m = App::new("Simple OPC UA Client") + .arg(Arg::with_name("url") + .long("url") + .help("Specify the OPC UA endpoint to connect to") + .takes_value(true) + .default_value("opc.tcp://localhost:4855") + .required(false)) + .get_matches(); + let url = m.value_of("url").unwrap().to_string(); // Optional - enable OPC UA logging opcua_console_logging::init(); @@ -36,48 +32,35 @@ fn main() { .client().unwrap(); if let Ok(session) = client.connect_to_endpoint((url.as_ref(), SecurityPolicy::None.to_str(), MessageSecurityMode::None, UserTokenPolicy::anonymous()), IdentityToken::Anonymous) { - if let Err(result) = subscription_loop(session) { - println!("ERROR: Got an error while performing action - {}", result); + if let Err(result) = subscribe_to_variables(session.clone()) { + println!("ERROR: Got an error while subscribing to variables - {}", result); + } else { + // Loops forever. The publish thread will call the callback with changes on the variables + let _ = Session::run(session); } } } -fn subscription_loop(session: Arc>) -> Result<(), StatusCode> { - // This scope is important - we don't want the session to be locked when the code hits the - // loop below - { - let mut session = session.write().unwrap(); +fn subscribe_to_variables(session: Arc>) -> Result<(), StatusCode> { + let mut session = session.write().unwrap(); + // Creates a subscription with a data change callback + let subscription_id = session.create_subscription(2000.0, 10, 30, 0, 0, true, DataChangeCallback::new(|changed_monitored_items| { + println!("Data change from server:"); + changed_monitored_items.iter().for_each(|item| print_value(item)); + }))?; + println!("Created a subscription with id = {}", subscription_id); - // Creates our subscription - let subscription_id = session.create_subscription(2000.0, 10, 30, 0, 0, true, DataChangeCallback::new(|items| { - println!("Data change from server:"); - items.iter().for_each(|item| { - print_value(&item.item_to_monitor(), &item.value()); - }); - }))?; - println!("Created a subscription with id = {}", subscription_id); - - // Create some monitored items - let read_nodes = vec![ - ReadValueId::from(NodeId::new(2, "v1")), - ReadValueId::from(NodeId::new(2, "v2")), - ReadValueId::from(NodeId::new(2, "v3")), - ReadValueId::from(NodeId::new(2, "v4")), - ]; - let items_to_create: Vec = read_nodes.into_iter().map(|read_node| { - MonitoredItemCreateRequest::new(read_node, MonitoringMode::Reporting, MonitoringParameters::default()) - }).collect(); - let _ = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create)?; - } - - // Loops forever. The publish thread will call the callback with changes on the variables - let _ = Session::run(session); + // Create some monitored items + let items_to_create: Vec = ["v1", "v2", "v3", "v4"].iter() + .map(|v| NodeId::new(2, *v).into()).collect(); + let _ = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create)?; Ok(()) } -fn print_value(read_value_id: &ReadValueId, data_value: &DataValue) { - let node_id = read_value_id.node_id.to_string(); +fn print_value(item: &MonitoredItem) { + let node_id = &item.item_to_monitor().node_id; + let data_value = item.value(); if let Some(ref value) = data_value.value { println!("Item \"{}\", Value = {:?}", node_id, value); } else { diff --git a/samples/web-client/src/main.rs b/samples/web-client/src/main.rs index 7375e4e3f..e1a5d302a 100644 --- a/samples/web-client/src/main.rs +++ b/samples/web-client/src/main.rs @@ -219,7 +219,7 @@ impl OPCUASession { // Create some monitored items let items_to_create: Vec = node_ids.iter().map(|node_id| { let node_id = NodeId::from_str(node_id).unwrap(); // Trust client to not break this - MonitoredItemCreateRequest::new(node_id.into(), MonitoringMode::Reporting, MonitoringParameters::default()) + node_id.into() }).collect(); let _results = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create); } diff --git a/server/src/comms/tcp_transport.rs b/server/src/comms/tcp_transport.rs index 043b02512..adfc0ae31 100644 --- a/server/src/comms/tcp_transport.rs +++ b/server/src/comms/tcp_transport.rs @@ -584,7 +584,7 @@ impl TcpTransport { /// Start the subscription timer to service subscriptions fn spawn_subscriptions_task(transport: Arc>, sender: UnboundedSender<(u32, SupportedMessage)>) { /// Subscription events are passed sent from the monitor task to the receiver - #[derive(Clone, Debug, PartialEq)] + #[derive(Clone, Debug)] enum SubscriptionEvent { PublishResponses(VecDeque), } diff --git a/server/src/subscriptions/mod.rs b/server/src/subscriptions/mod.rs index 17e4f9bb2..ffb00dd58 100644 --- a/server/src/subscriptions/mod.rs +++ b/server/src/subscriptions/mod.rs @@ -18,7 +18,7 @@ pub struct PublishRequestEntry { pub results: Option>, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct PublishResponseEntry { pub request_id: u32, pub response: SupportedMessage, diff --git a/types/src/service_types/impls.rs b/types/src/service_types/impls.rs index 65da87b31..d10523639 100644 --- a/types/src/service_types/impls.rs +++ b/types/src/service_types/impls.rs @@ -493,6 +493,12 @@ impl SignatureData { } } +impl Into for NodeId { + fn into(self) -> MonitoredItemCreateRequest { + MonitoredItemCreateRequest::new(self.into(), MonitoringMode::Reporting, MonitoringParameters::default()) + } +} + impl MonitoredItemCreateRequest { /// Adds an item to monitor to the subscription pub fn new(item_to_monitor: ReadValueId, monitoring_mode: MonitoringMode, requested_parameters: MonitoringParameters) -> MonitoredItemCreateRequest {