diff --git a/Cargo.lock b/Cargo.lock index bf41f253b..2ed9cbfa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,14 +314,14 @@ dependencies = [ [[package]] name = "clap" -version = "2.32.0" +version = "2.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", - "strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "textwrap 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1256,7 +1256,7 @@ dependencies = [ name = "opcua-certificate-creator" version = "0.6.0" dependencies = [ - "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "opcua-core 0.6.0", "opcua-types 0.6.0", ] @@ -1334,7 +1334,7 @@ dependencies = [ name = "opcua-discovery-client" version = "0.6.0" dependencies = [ - "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "opcua-client 0.6.0", "opcua-console-logging 0.6.0", "opcua-core 0.6.0", @@ -1345,7 +1345,7 @@ dependencies = [ name = "opcua-gfx-client" version = "0.6.0" dependencies = [ - "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "conrod 0.61.1 (registry+https://github.com/rust-lang/crates.io-index)", "opcua-client 0.6.0", "opcua-console-logging 0.6.0", @@ -1369,7 +1369,7 @@ dependencies = [ name = "opcua-mqtt-client" version = "0.6.0" dependencies = [ - "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "opcua-client 0.6.0", "opcua-console-logging 0.6.0", "rumqtt 0.30.0 (git+https://github.com/AtherEnergy/rumqtt.git?rev=83b4694525061e2ccef617c0ac867db2044cc4e7)", @@ -1404,7 +1404,7 @@ dependencies = [ name = "opcua-simple-client" version = "0.6.0" dependencies = [ - "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "opcua-client 0.6.0", "opcua-console-logging 0.6.0", ] @@ -2203,7 +2203,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "strsim" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -2279,7 +2279,7 @@ dependencies = [ [[package]] name = "textwrap" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2990,7 +2990,7 @@ dependencies = [ "checksum cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "11d43355396e872eefb45ce6342e4374ed7bc2b3a502d1b28e36d6e23c05d1f4" "checksum cgl 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "55e7ec0b74fe5897894cbc207092c577e87c52f8a59e8ca8d97ef37551f60a49" "checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878" -"checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e" +"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum cocoa 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b44bd25bd275e9d74a5dff8ca55f2fb66c9ad5e12170d58697701df21a56e0e" "checksum conrod 0.61.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d4cd54996c99c3dac7e1f4ad1781979509639a43cdeca107036f3de526a534cd" @@ -3181,7 +3181,7 @@ dependencies = [ "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum stb_truetype 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "69b7df505db8e81d54ff8be4693421e5b543e08214bd8d99eb761fcb4d5668ba" "checksum string 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b639411d0b9c738748b5397d5ceba08e648f4f1992231aa859af1a017f31f60b" -"checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" +"checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" "checksum syn 0.13.11 (registry+https://github.com/rust-lang/crates.io-index)" = "14f9bf6292f3a61d2c716723fdb789a41bbe104168e6f496dc6497e531ea1b9b" "checksum syn 0.15.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1825685f977249735d510a242a6727b46efe914bb67e38d30c071b1b72b1d5c2" "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" @@ -3189,7 +3189,7 @@ dependencies = [ "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" -"checksum textwrap 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "307686869c93e71f94da64286f9a9524c0f308a9e1c87a583de8e9c9039ad3f6" +"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" "checksum thread-id 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c7fbf4c9d56b320106cd64fd024dadfa0be7cb4706725fc44a7d7ce952d820c1" "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" diff --git a/client/src/callbacks.rs b/client/src/callbacks.rs index 2e1365a84..1d37d7348 100644 --- a/client/src/callbacks.rs +++ b/client/src/callbacks.rs @@ -46,8 +46,8 @@ impl OnDataChange for DataChangeCallback { impl DataChangeCallback { /// Constructs a callback from the supplied function - pub fn new(cb: CB) -> DataChangeCallback where CB: Fn(Vec<&MonitoredItem>) + Send + Sync + 'static { - DataChangeCallback { + pub fn new(cb: CB) -> Self where CB: Fn(Vec<&MonitoredItem>) + Send + Sync + 'static { + Self { cb: Box::new(cb) } } @@ -77,8 +77,8 @@ impl OnConnectionStatusChange for ConnectionStatusCallback { impl ConnectionStatusCallback { // Constructor - pub fn new(cb: CB) -> ConnectionStatusCallback where CB: FnMut(bool) + Send + Sync + 'static { - ConnectionStatusCallback { + pub fn new(cb: CB) -> Self where CB: FnMut(bool) + Send + Sync + 'static { + Self { cb: Box::new(cb) } } @@ -96,8 +96,8 @@ impl OnSessionClosed for SessionClosedCallback { impl SessionClosedCallback { // Constructor - pub fn new(cb: CB) -> SessionClosedCallback where CB: FnMut(StatusCode) + Send + Sync + 'static { - SessionClosedCallback { + pub fn new(cb: CB) -> Self where CB: FnMut(StatusCode) + Send + Sync + 'static { + Self { cb: Box::new(cb) } } diff --git a/client/src/session.rs b/client/src/session.rs index d45313efb..c841960c5 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -130,6 +130,10 @@ impl Session { /// * `certificate_store` - certificate management on disk /// * `session_info` - information required to establish a new session. /// + /// # Returns + /// + /// * `Session` - the interface that shall be used to communicate between the client and the server. + /// pub(crate) fn new(application_description: ApplicationDescription, certificate_store: Arc>, session_info: SessionInfo, session_retry_policy: SessionRetryPolicy) -> Session { // TODO take these from the client config let decoding_limits = DecodingLimits::default(); @@ -157,6 +161,12 @@ impl Session { /// Connects to the server, creates and activates a session. If there /// is a failure, it will be communicated by the status code in the result. + /// + /// # Returns + /// + /// * `Ok(())` - connection has happened and the session is activated + /// * `Err(StatusCode)` - reason for failure + /// pub fn connect_and_activate(&mut self) -> Result<(), StatusCode> { // Connect now using the session state self.connect()?; @@ -165,12 +175,24 @@ impl Session { Ok(()) } - /// Sets the session retry policy + /// Sets the session retry policy that dictates what this session will do if the connection + /// fails or goes down. The retry policy enables the session to retry a connection on an + /// interval up to a maxmimum number of times. + /// + /// # Arguments + /// + /// * `session_retry_policy` - the session retry policy to use + /// pub fn set_session_retry_policy(&mut self, session_retry_policy: SessionRetryPolicy) { self.session_retry_policy = session_retry_policy; } /// Register a callback to be notified when the session has been closed. + /// + /// # Arguments + /// + /// * `session_closed_callback` - the session closed callback + /// pub fn set_session_closed_callback(&mut self, session_closed_callback: CB) where CB: OnSessionClosed + Send + Sync + 'static { let mut session_state = trace_write_lock_unwrap!(self.session_state); session_state.set_session_closed_callback(session_closed_callback); @@ -178,12 +200,27 @@ impl Session { /// Registers a callback to be notified when the session connection status has changed. /// This will be called if connection status changes from connected to disconnected or vice versa. - pub fn set_connection_status_callback(&mut self, callback: CB) where CB: OnConnectionStatusChange + Send + Sync + 'static { - self.connection_status_callback = Some(Box::new(callback)); + /// + /// # Arguments + /// + /// * `connection_status_callback` - the connection status callback. + /// + pub fn set_connection_status_callback(&mut self, connection_status_callback: CB) where CB: OnConnectionStatusChange + Send + Sync + 'static { + self.connection_status_callback = Some(Box::new(connection_status_callback)); } /// Reconnects to the server and tries to activate the existing session. If there - /// is a failure, it will be communicated by the status code in the result. + /// is a failure, it will be communicated by the status code in the result. You should not + /// call this if there is a session retry policy associated with the session. + /// + /// Reconnecting will attempt to transfer or recreate subscriptions that were on the old + /// session before it terminated. + /// + /// # Returns + /// + /// * `Ok(())` - reconnection has happened and the session is activated + /// * `Err(StatusCode)` - reason for failure + /// pub fn reconnect_and_activate(&mut self) -> Result<(), StatusCode> { // Do nothing if already connected / activated if self.is_connected() { @@ -367,6 +404,12 @@ impl Session { /// Connects to the server using the configured session arguments. No attempt is made to retry /// the connection if the attempt fails. If there is a failure, it will be communicated by the /// status code in the result. + /// + /// # Returns + /// + /// * `Ok(())` - connection has happened + /// * `Err(StatusCode)` - reason for failure + /// pub fn connect_no_retry(&mut self) -> Result<(), StatusCode> { let endpoint_url = self.session_info.endpoint.endpoint_url.clone(); info!("Connect"); @@ -406,19 +449,31 @@ impl Session { } /// Test if the session is in a connected state + /// + /// # Returns + /// + /// * `true` - Session is connected + /// * `false` - Session is not connected + /// pub fn is_connected(&self) -> bool { self.transport.is_connected() } + /// Internal constant for the sleep interval used during polling const POLL_SLEEP_INTERVAL: u64 = 50; - /// Runs a polling loop for this session to perform periodic activity such as processing subscriptions, - /// as well as recovering from connection errors. The run command will break if the session is disconnected + /// Synchronously runs a polling loop over the supplied session. The run command performs + /// periodic actions such as receiving messages, processing subscriptions, and recovering from + /// connection errors. The run command will break if the session is disconnected /// and cannot be reestablished. /// /// The `run()` function returns a `Sender` that can be used to send a `()` message to the session /// to cause it to terminate. /// + /// # Arguments + /// + /// * `session` - the session to run ynchronously + /// /// # Returns /// /// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the @@ -430,12 +485,20 @@ impl Session { tx } - /// Runs the server asynchronously on a new thread, allowing the calling - /// thread to continue do other things. + /// Asynchronously runs a polling loop over the supplied session. The run command performs + /// periodic actions such as receiving messages, processing subscriptions, and recovering from + /// connection errors. The run command will break if the session is disconnected + /// and cannot be reestablished. + /// + /// The session runs on a separate thread so the call will return immediately. /// /// The `run()` function returns a `Sender` that can be used to send a `()` message to the session /// to cause it to terminate. /// + /// # Arguments + /// + /// * `session` - the session to run asynchronously + /// /// # Returns /// /// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the @@ -471,7 +534,16 @@ impl Session { /// async responses, attempts to reconnect if the client is disconnected from the client and /// sleeps a little bit if nothing needed to be done. /// - /// Returns `true` if it did something, `false` if it caused the thread to sleep for a bit. + /// # Arguments + /// + /// * `sleep_for` - the period of time in milliseconds that poll should sleep for if it performed + /// no action. + /// + /// # Returns + /// + /// * `true` - if an action was performed during the poll + /// * `false` - if no action was performed during the poll and the poll slept + /// pub fn poll(&mut self, sleep_for: u64) -> Result { let did_something = if self.is_connected() { self.handle_publish_responses() @@ -508,8 +580,117 @@ impl Session { Ok(did_something) } + + /// Sends a [`FindServersRequest`] to the server denoted by the discovery url. + /// + /// See OPC UA Part 4 - Services 5.4.2 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `endpoint_url` - The network address that the Client used to access the Discovery Endpoint. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of [`ApplicationDescription`] that meet criteria specified in the request. + /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// + /// [`FindServersRequest`]: ./struct.FindServersRequest.html + /// [`ApplicationDescription`]: ./struct.ApplicationDescription.html + /// + pub fn find_servers(&mut self, endpoint_url: T) -> Result, StatusCode> where T: Into { + let request = FindServersRequest { + request_header: self.make_request_header(), + endpoint_url: endpoint_url.into(), + locale_ids: None, + server_uris: None, + }; + let response = self.send_request(request)?; + if let SupportedMessage::FindServersResponse(response) = response { + crate::process_service_result(&response.response_header)?; + let servers = if let Some(servers) = response.servers { + servers + } else { + Vec::new() + }; + Ok(servers) + } else { + Err(crate::process_unexpected_response(response)) + } + } + + /// Obtain the list of endpoints supported by the server by sending it a [`GetEndpointsRequest`]. + /// + /// See OPC UA Part 4 - Services 5.4.4 for complete description of the service and error responses. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of endpoints supported by the server + /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// + /// [`GetEndpointsRequest`]: ./struct.GetEndpointsRequest.html + /// + pub fn get_endpoints(&mut self) -> Result, StatusCode> { + debug!("get_endpoints"); + let endpoint_url = self.session_info.endpoint.endpoint_url.clone(); + let request = GetEndpointsRequest { + request_header: self.make_request_header(), + endpoint_url, + locale_ids: None, + profile_uris: None, + }; + + let response = self.send_request(request)?; + if let SupportedMessage::GetEndpointsResponse(response) = response { + crate::process_service_result(&response.response_header)?; + if response.endpoints.is_none() { + debug!("get_endpoints, success but no endpoints"); + Ok(Vec::new()) + } else { + debug!("get_endpoints, success"); + Ok(response.endpoints.unwrap()) + } + } else { + error!("get_endpoints failed {:?}", response); + Err(crate::process_unexpected_response(response)) + } + } + + /// This function is used by servers that wish to register themselves with a discovery server. + /// i.e. one server is the client to another server. The server sends a [`RegisterServerRequest`] + /// to the discovery server to register itself. Servers are expected to re-register themselves periodically + /// with the discovery server, with a maximum of 10 minute intervals. + /// + /// See OPC UA Part 4 - Services 5.4.5 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `server` - The server to register + /// + /// # Returns + /// + /// * `Ok(())` - Success + /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// + /// [`RegisterServerRequest`]: ./struct.RegisterServerRequest.html + /// + pub fn register_server(&mut self, server: RegisteredServer) -> Result<(), StatusCode> { + let request = RegisterServerRequest { + request_header: self.make_request_header(), + server, + }; + let response = self.send_request(request)?; + if let SupportedMessage::RegisterServerResponse(response) = response { + crate::process_service_result(&response.response_header)?; + Ok(()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + /// Sends an [`OpenSecureChannelRequest`] to the server /// + /// + /// See OPC UA Part 4 - Services 5.5.2 for complete description of the service and error responses. /// # Returns /// /// * `Ok(())` - Success @@ -526,6 +707,8 @@ impl Session { /// Sends a [`CloseSecureChannelRequest`] to the server which will cause the server to drop /// the connection. /// + /// See OPC UA Part 4 - Services 5.5.3 for complete description of the service and error responses. + /// /// # Returns /// /// * `Ok(())` - Success @@ -546,6 +729,8 @@ impl Session { /// session. Internally, the session will store the authentication token which is used for requests /// subsequent to this call. /// + /// See OPC UA Part 4 - Services 5.6.2 for complete description of the service and error responses. + /// /// # Returns /// /// * `Ok(NodeId)` - Success, session id @@ -732,6 +917,7 @@ impl Session { }); } + /// Returns the security policy fn security_policy(&self) -> SecurityPolicy { let secure_channel = trace_read_lock_unwrap!(self.secure_channel); secure_channel.security_policy() @@ -739,6 +925,8 @@ impl Session { /// Sends an [`ActivateSessionRequest`] to the server to activate this session /// + /// See OPC UA Part 4 - Services 5.6.3 for complete description of the service and error responses. + /// /// # Returns /// /// * `Ok(())` - Success @@ -804,7 +992,9 @@ impl Session { } } - /// Sends a [`CancelRequest`] to the server to cancel an outstanding service request. + /// Cancels an outstanding service request by sending a [`CancelRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.6.5 for complete description of the service and error responses. /// /// # Arguments /// @@ -831,118 +1021,176 @@ impl Session { } } - /// Sends a [`FindServersRequest`] to the server denoted by the discovery url + /// Add nodes by sending a [`AddNodesRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.7.2 for complete description of the service and error responses. /// /// # Arguments /// - /// * `endpoint_url` - The network address that the Client used to access the Discovery Endpoint. + /// * `nodes_to_add` - A list of [`AddNodesItem`] to be added to the server. /// /// # Returns /// - /// * `Ok(Vec)` - Success, list of Servers that meet criteria specified in the request. - /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// * `Ok(Vec)` - A list of [`AddNodesResult`] corresponding to each add node operation. + /// * `Err(StatusCode)` - Status code reason for failure. /// - /// [`FindServersRequest`]: ./struct.FindServersRequest.html + /// [`AddNodesRequest`]: ./struct.AddNodesRequest.html + /// [`AddNodesItem`]: ./struct.AddNodesItem.html + /// [`AddNodesResult`]: ./struct.AddNodesResult.html /// - pub fn find_servers(&mut self, endpoint_url: T) -> Result, StatusCode> where T: Into { - let request = FindServersRequest { - request_header: self.make_request_header(), - endpoint_url: endpoint_url.into(), - locale_ids: None, - server_uris: None, - }; - let response = self.send_request(request)?; - if let SupportedMessage::FindServersResponse(response) = response { - crate::process_service_result(&response.response_header)?; - let servers = if let Some(servers) = response.servers { - servers - } else { - Vec::new() - }; - Ok(servers) + pub fn add_nodes(&mut self, nodes_to_add: &[AddNodesItem]) -> Result, StatusCode> { + if nodes_to_add.is_empty() { + error!("add_nodes, called with no nodes to add"); + Err(StatusCode::BadNothingToDo) } else { - Err(crate::process_unexpected_response(response)) + let request = AddNodesRequest { + request_header: self.make_request_header(), + nodes_to_add: Some(nodes_to_add.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::AddNodesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } } } - /// Sends a [`RegisterServerRequest`] to the discovery server to register a server. Although - /// this function appears in the client API, it is for the benefit of servers that wish to - /// register with a discovery server. Servers are expected to re-register themselves periodically - /// with the discovery server, with a maximum of 10 minute intervals. + /// Add references by sending a [`AddReferencesRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.7.3 for complete description of the service and error responses. /// /// # Arguments /// - /// * `server` - The server to register + /// * `references_to_add` - A list of [`AddReferencesItem`] to be sent to the server. /// /// # Returns /// - /// * `Ok(())` - Success - /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// * `Ok(Vec)` - A list of `StatusCode` corresponding to each add reference operation. + /// * `Err(StatusCode)` - Status code reason for failure. /// - /// [`RegisterServerRequest`]: ./struct.FindServersRequest.html + /// [`AddReferencesRequest`]: ./struct.AddReferencesRequest.html + /// [`AddReferencesItem`]: ./struct.AddReferencesItem.html /// - pub fn register_server(&mut self, server: RegisteredServer) -> Result<(), StatusCode> { - let request = RegisterServerRequest { - request_header: self.make_request_header(), - server, - }; - let response = self.send_request(request)?; - if let SupportedMessage::RegisterServerResponse(response) = response { - crate::process_service_result(&response.response_header)?; - Ok(()) + pub fn add_references(&mut self, references_to_add: &[AddReferencesItem]) -> Result, StatusCode> { + if references_to_add.is_empty() { + error!("add_references, called with no references to add"); + Err(StatusCode::BadNothingToDo) } else { - Err(crate::process_unexpected_response(response)) - } - } - - /// Returns the subscription state object - pub fn subscription_state(&self) -> Arc> { - self.subscription_state.clone() - } - - /// Sends a GetEndpoints request to the server - pub fn get_endpoints(&mut self) -> Result, StatusCode> { - debug!("get_endpoints"); - let endpoint_url = self.session_info.endpoint.endpoint_url.clone(); - let request = GetEndpointsRequest { - request_header: self.make_request_header(), - endpoint_url, - locale_ids: None, - profile_uris: None, - }; - - let response = self.send_request(request)?; - if let SupportedMessage::GetEndpointsResponse(response) = response { - crate::process_service_result(&response.response_header)?; - if response.endpoints.is_none() { - debug!("get_endpoints, success but no endpoints"); - Ok(Vec::new()) + let request = AddReferencesRequest { + request_header: self.make_request_header(), + references_to_add: Some(references_to_add.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::AddReferencesResponse(response) = response { + Ok(response.results.unwrap()) } else { - debug!("get_endpoints, success"); - Ok(response.endpoints.unwrap()) + Err(crate::process_unexpected_response(response)) } - } else { - error!("get_endpoints failed {:?}", response); - Err(crate::process_unexpected_response(response)) } } - /// Sends a BrowseRequest to the server - pub fn browse(&mut self, nodes_to_browse: &[BrowseDescription]) -> Result>, StatusCode> { - if nodes_to_browse.is_empty() { - error!("browse, was not supplied with any nodes to browse"); - Err(StatusCode::BadNothingToDo) - } else { - let request = BrowseRequest { - request_header: self.make_request_header(), - view: ViewDescription { - view_id: NodeId::null(), - timestamp: DateTime::now(), - view_version: 0, - }, - requested_max_references_per_node: 1000, - nodes_to_browse: Some(nodes_to_browse.to_vec()), - }; + /// Delete nodes by sending a [`DeleteNodesRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.7.4 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `nodes_to_delete` - A list of [`DeleteNodesItem`] to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of `StatusCode` corresponding to each delete node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + /// [`DeleteNodesRequest`]: ./struct.DeleteNodesRequest.html + /// [`DeleteNodesItem`]: ./struct.DeleteNodesItem.html + /// + pub fn delete_nodes(&mut self, nodes_to_delete: &[DeleteNodesItem]) -> Result, StatusCode> { + if nodes_to_delete.is_empty() { + error!("delete_nodes, called with no nodes to delete"); + Err(StatusCode::BadNothingToDo) + } else { + let request = DeleteNodesRequest { + request_header: self.make_request_header(), + nodes_to_delete: Some(nodes_to_delete.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::DeleteNodesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Delete references by sending a [`DeleteReferencesRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.7.5 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `nodes_to_delete` - A list of [`DeleteReferencesItem`] to be sent to the server. + /// + /// # Returns + /// + /// * `Ok(Vec)` - A list of `StatusCode` corresponding to each delete node operation. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + /// [`DeleteReferencesRequest`]: ./struct.DeleteReferencesRequest.html + /// [`DeleteReferencesItem`]: ./struct.DeleteReferencesItem.html + /// + pub fn delete_references(&mut self, references_to_delete: &[DeleteReferencesItem]) -> Result, StatusCode> { + if references_to_delete.is_empty() { + error!("delete_references, called with no references to delete"); + Err(StatusCode::BadNothingToDo) + } else { + let request = DeleteReferencesRequest { + request_header: self.make_request_header(), + references_to_delete: Some(references_to_delete.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::DeleteReferencesResponse(response) = response { + Ok(response.results.unwrap()) + } else { + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Discover the references to the specified nodes by sending a [`BrowseRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.8.2 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `nodes_to_browse` - A list of [`BrowseDescription`] describing nodes to browse. + /// + /// # Returns + /// + /// * `Ok(Option)` - A list [`BrowseResult`] corresponding to each node to browse. A browse result + /// may contain a continuation point, for use with `browse_next()`. + /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// + /// [`BrowseRequest`]: ./struct.BrowseRequest.html + /// [`BrowseDescription`]: ./struct.BrowseDescription.html + /// [`BrowseResult`]: ./struct.BrowseResult.html + /// + pub fn browse(&mut self, nodes_to_browse: &[BrowseDescription]) -> Result>, StatusCode> { + if nodes_to_browse.is_empty() { + error!("browse, was not supplied with any nodes to browse"); + Err(StatusCode::BadNothingToDo) + } else { + let request = BrowseRequest { + request_header: self.make_request_header(), + view: ViewDescription { + view_id: NodeId::null(), + timestamp: DateTime::now(), + view_version: 0, + }, + requested_max_references_per_node: 1000, + nodes_to_browse: Some(nodes_to_browse.to_vec()), + }; let response = self.send_request(request)?; if let SupportedMessage::BrowseResponse(response) = response { debug!("browse, success"); @@ -955,7 +1203,26 @@ impl Session { } } - /// Sends a BrowseNextRequest to the server + /// Continue to discover references to nodes by sending continuation points in a [`BrowseNextRequest`] + /// to the server. This function may have to be called repeatedly to process the initial query. + /// + /// See OPC UA Part 4 - Services 5.8.3 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `release_continuation_points` - Flag indicating if the continuation points should be released by the server + /// * `continuation_points` - A list of [`BrowseDescription`] continuation points + /// + /// # Returns + /// + /// * `Ok(Option)` - A list [`BrowseResult`] corresponding to each node to browse. A browse result + /// may contain a continuation point, for use with `browse_next()`. + /// * `Err(StatusCode)` - Request failed, status code is the reason for failure + /// + /// [`BrowseRequest`]: ./struct.BrowseRequest.html + /// [`BrowseNextRequest`]: ./struct.BrowseNextRequest.html + /// [`BrowseResult`]: ./struct.BrowseResult.html + /// pub fn browse_next(&mut self, release_continuation_points: bool, continuation_points: &[ByteString]) -> Result>, StatusCode> { if continuation_points.is_empty() { error!("browse_next, was not supplied with any continuation points"); @@ -980,15 +1247,21 @@ impl Session { /// Reads the value of nodes by sending a [`ReadRequest`] to the server. /// + /// See OPC UA Part 4 - Services 5.10.2 for complete description of the service and error responses. + /// /// # Arguments /// - /// * `nodes_to_read` - A list of `ReadValueId` to be read by the server. + /// * `nodes_to_read` - A list of [`ReadValueId`] to be read by the server. /// /// # Returns /// - /// * `Ok(Vec)` - A list of datavalues corresponding to each read operation. + /// * `Ok(Vec)` - A list of [`DataValue`] corresponding to each read operation. /// * `Err(StatusCode)` - Status code reason for failure. /// + /// [`ReadRequest`]: ./struct.ReadRequest.html + /// [`ReadValueId`]: ./struct.ReadValueId.html + /// [`DataValue`]: ./struct.DataValue.html + /// pub fn read(&mut self, nodes_to_read: &[ReadValueId]) -> Result>, StatusCode> { if nodes_to_read.is_empty() { // No subscriptions @@ -1016,15 +1289,20 @@ impl Session { /// Writes values to nodes by sending a [`WriteRequest`] to the server. /// + /// See OPC UA Part 4 - Services 5.10.4 for complete description of the service and error responses. + /// /// # Arguments /// - /// * `nodes_to_write` - A list of `WriteValue` to be sent to the server. + /// * `nodes_to_write` - A list of [`WriteValue`] to be sent to the server. /// /// # Returns /// - /// * `Ok(Vec)` - A list of results corresponding to each write operation. + /// * `Ok(Vec)` - A list of `StatusCode` results corresponding to each write operation. /// * `Err(StatusCode)` - Status code reason for failure. /// + /// [`WriteRequest`]: ./struct.WriteRequest.html + /// [`WriteValue`]: ./struct.WriteValue.html + /// pub fn write_value(&mut self, nodes_to_write: &[WriteValue]) -> Result>, StatusCode> { if nodes_to_write.is_empty() { // No subscriptions @@ -1047,117 +1325,371 @@ impl Session { } } - /// Add nodes by sending a [`AddNodesRequest`] to the server. + /// Calls a single method on an object on the server by sending a [`CallRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.11.2 for complete description of the service and error responses. /// /// # Arguments /// - /// * `nodes_to_add` - A list of `AddNodesItem` to be added to the server. + /// * `method` - The method to call. Note this function takes anything that can be turned into + /// a [`CallMethodRequest`] which includes a (`NodeId`, `NodeId`, `Option>`) + /// which refers to the object id, method id, and input arguments respectively. + /// * `items_to_delete` - List of Server-assigned ids for the MonitoredItems to be deleted. /// /// # Returns /// - /// * `Ok(Vec)` - Result for each add node operation. + /// * `Ok(CallMethodResult)` - A `[CallMethodResult]` for the Method call. /// * `Err(StatusCode)` - Status code reason for failure. /// - pub fn add_nodes(&mut self, nodes_to_add: &[AddNodesItem]) -> Result, StatusCode> { - if nodes_to_add.is_empty() { - error!("add_nodes, called with no nodes to add"); - Err(StatusCode::BadNothingToDo) - } else { - let request = AddNodesRequest { - request_header: self.make_request_header(), - nodes_to_add: Some(nodes_to_add.to_vec()), - }; - let response = self.send_request(request)?; - if let SupportedMessage::AddNodesResponse(response) = response { - Ok(response.results.unwrap()) + /// [`CallRequest`]: ./struct.CallRequest.html + /// [`CallMethodRequest`]: ./struct.CallMethodRequest.html + /// [`CallMethodResult`]: ./struct.CallMethodResult.html + /// + pub fn call_method(&mut self, method: T) -> Result where T: Into { + debug!("call_method"); + let methods_to_call = Some(vec![method.into()]); + let request = CallRequest { + request_header: self.make_request_header(), + methods_to_call, + }; + let response = self.send_request(request)?; + if let SupportedMessage::CallResponse(response) = response { + if let Some(mut results) = response.results { + if results.len() != 1 { + error!("call_method, expecting a result from the call to the server, got {} results", results.len()); + Err(StatusCode::BadUnexpectedError) + } else { + Ok(results.remove(0)) + } } else { - Err(crate::process_unexpected_response(response)) + error!("call_method, expecting a result from the call to the server, got nothing"); + Err(StatusCode::BadUnexpectedError) } + } else { + Err(crate::process_unexpected_response(response)) } } - /// Add references by sending a [`AddReferencesRequest`] to the server. + /// Calls GetMonitoredItems via call_method(), putting a sane interface on the input / output. /// /// # Arguments /// - /// * `references_to_add` - A list of `AddReferencesItem` to be sent to the server. + /// * `subscription_id` - Server allocated identifier for the subscription to return monitored items for. /// /// # Returns /// - /// * `Ok(Vec)` - Result for each add reference operation. + /// * `Ok((Vec, Vec))` - Result for call, consisting a list of (monitored_item_id, client_handle) /// * `Err(StatusCode)` - Status code reason for failure. /// - pub fn add_references(&mut self, references_to_add: &[AddReferencesItem]) -> Result, StatusCode> { - if references_to_add.is_empty() { - error!("add_references, called with no references to add"); - Err(StatusCode::BadNothingToDo) - } else { - let request = AddReferencesRequest { - request_header: self.make_request_header(), - references_to_add: Some(references_to_add.to_vec()), - }; - let response = self.send_request(request)?; - if let SupportedMessage::AddReferencesResponse(response) = response { - Ok(response.results.unwrap()) + pub fn call_get_monitored_items(&mut self, subscription_id: u32) -> Result<(Vec, Vec), StatusCode> { + let args = Some(vec![Variant::from(subscription_id)]); + let object_id: NodeId = ObjectId::Server.into(); + let method_id: NodeId = MethodId::Server_GetMonitoredItems.into(); + let request: CallMethodRequest = (object_id, method_id, args).into(); + let response = self.call_method(request)?; + if let Some(mut result) = response.output_arguments { + if result.len() == 2 { + let server_handles = result.remove(0).as_u32_array().map_err(|_| StatusCode::BadUnexpectedError)?; + let client_handles = result.remove(0).as_u32_array().map_err(|_| StatusCode::BadUnexpectedError)?; + Ok((server_handles, client_handles)) } else { - Err(crate::process_unexpected_response(response)) + error!("Expected a result with 2 args and didn't get it."); + Err(StatusCode::BadUnexpectedError) } + } else { + error!("Expected a result and didn't get it."); + Err(StatusCode::BadUnexpectedError) } } - /// Delete nodes by sending a [`DeleteNodesRequest`] to the server. + /// Creates monitored items on a subscription by sending a [`CreateMonitoredItemsRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.12.2 for complete description of the service and error responses. /// /// # Arguments /// - /// * `nodes_to_delete` - A list of `DeleteNodesItem` to be sent to the server. + /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem + /// * `timestamps_to_return` - An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem. + /// * `items_to_create` - A list of [`MonitoredItemCreateRequest`] to be created and assigned to the specified Subscription. /// /// # Returns /// - /// * `Ok(Vec)` - Result for each delete node operation. - /// * `Err(StatusCode)` - Status code reason for failure. + /// * `Ok(Vec)` - A list of [`MonitoredItemCreateResult`] corresponding to the items to create. + /// The size and order of the list matches the size and order of the `items_to_create` request parameter. + /// * `Err(StatusCode)` - Status code reason for failure /// - pub fn delete_nodes(&mut self, nodes_to_delete: &[DeleteNodesItem]) -> Result, StatusCode> { - if nodes_to_delete.is_empty() { - error!("delete_nodes, called with no nodes to delete"); + /// [`CreateMonitoredItemsRequest`]: ./struct.CreateMonitoredItemsRequest.html + /// [`MonitoredItemCreateRequest`]: ./struct.MonitoredItemCreateRequest.html + /// [`MonitoredItemCreateResult`]: ./struct.MonitoredItemCreateResult.html + /// + pub fn create_monitored_items(&mut self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest]) -> Result, StatusCode> { + debug!("create_monitored_items, for subscription {}, {} items", subscription_id, items_to_create.len()); + if subscription_id == 0 { + error!("create_monitored_items, subscription id 0 is invalid"); + Err(StatusCode::BadInvalidArgument) + } else if !self.subscription_exists(subscription_id) { + error!("create_monitored_items, subscription id {} does not exist", subscription_id); + Err(StatusCode::BadInvalidArgument) + } else if items_to_create.is_empty() { + error!("create_monitored_items, called with no items to create"); Err(StatusCode::BadNothingToDo) } else { - let request = DeleteNodesRequest { + // Assign each item a unique client handle + let mut items_to_create = items_to_create.to_vec(); + { + let mut session_state = trace_write_lock_unwrap!(self.session_state); + items_to_create.iter_mut().for_each(|i| { + i.requested_parameters.client_handle = session_state.next_monitored_item_handle(); + }); + } + + let request = CreateMonitoredItemsRequest { request_header: self.make_request_header(), - nodes_to_delete: Some(nodes_to_delete.to_vec()), + subscription_id, + timestamps_to_return, + items_to_create: Some(items_to_create.clone()), }; let response = self.send_request(request)?; - if let SupportedMessage::DeleteNodesResponse(response) = response { + if let SupportedMessage::CreateMonitoredItemsResponse(response) = response { + crate::process_service_result(&response.response_header)?; + if let Some(ref results) = response.results { + debug!("create_monitored_items, {} items created", items_to_create.len()); + // Set the items in our internal state + let items_to_create = items_to_create.iter() + .zip(results) + .map(|(i, r)| { + subscription::CreateMonitoredItem { + id: r.monitored_item_id, + client_handle: i.requested_parameters.client_handle, + discard_oldest: i.requested_parameters.discard_oldest, + item_to_monitor: i.item_to_monitor.clone(), + monitoring_mode: i.monitoring_mode, + queue_size: r.revised_queue_size, + sampling_interval: r.revised_sampling_interval, + } + }) + .collect::>(); + { + let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); + subscription_state.insert_monitored_items(subscription_id, &items_to_create); + } + } else { + debug!("create_monitored_items, success but no monitored items were created"); + } Ok(response.results.unwrap()) } else { + error!("create_monitored_items failed {:?}", response); Err(crate::process_unexpected_response(response)) } } } - /// Delete references by sending a [`DeleteReferencesRequest`] to the server. + /// Modifies monitored items on a subscription by sending a [`ModifyMonitoredItemsRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.12.3 for complete description of the service and error responses. /// /// # Arguments /// - /// * `nodes_to_delete` - A list of `DeleteReferencesItem` to be sent to the server. + /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem. + /// * `timestamps_to_return` - An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem. + /// * `items_to_modify` - The list of [`MonitoredItemModifyRequest`] to modify. /// /// # Returns /// - /// * `Ok(Vec)` - Result for each delete node operation. + /// * `Ok(Vec)` - A list of [`MonitoredItemModifyResult`] corresponding to the MonitoredItems to modify. + /// The size and order of the list matches the size and order of the `items_to_modify` request parameter. /// * `Err(StatusCode)` - Status code reason for failure. /// - pub fn delete_references(&mut self, references_to_delete: &[DeleteReferencesItem]) -> Result, StatusCode> { - if references_to_delete.is_empty() { - error!("delete_references, called with no references to delete"); - Err(StatusCode::BadNothingToDo) - } else { - let request = DeleteReferencesRequest { - request_header: self.make_request_header(), - references_to_delete: Some(references_to_delete.to_vec()), - }; - let response = self.send_request(request)?; - if let SupportedMessage::DeleteReferencesResponse(response) = response { + /// [`ModifyMonitoredItemsRequest`]: ./struct.ModifyMonitoredItemsRequest.html + /// [`MonitoredItemModifyRequest`]: ./struct.MonitoredItemModifyRequest.html + /// [`MonitoredItemModifyResult`]: ./struct.MonitoredItemModifyResult.html + /// + pub fn modify_monitored_items(&mut self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_modify: &[MonitoredItemModifyRequest]) -> Result, StatusCode> { + debug!("modify_monitored_items, for subscription {}, {} items", subscription_id, items_to_modify.len()); + if subscription_id == 0 { + error!("modify_monitored_items, subscription id 0 is invalid"); + Err(StatusCode::BadInvalidArgument) + } else if !self.subscription_exists(subscription_id) { + error!("modify_monitored_items, subscription id {} does not exist", subscription_id); + Err(StatusCode::BadInvalidArgument) + } else if items_to_modify.is_empty() { + error!("modify_monitored_items, called with no items to modify"); + Err(StatusCode::BadNothingToDo) + } else { + let monitored_item_ids = items_to_modify.iter() + .map(|i| i.monitored_item_id) + .collect::>(); + let request = ModifyMonitoredItemsRequest { + request_header: self.make_request_header(), + subscription_id, + timestamps_to_return, + items_to_modify: Some(items_to_modify.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::ModifyMonitoredItemsResponse(response) = response { + crate::process_service_result(&response.response_header)?; + if let Some(ref results) = response.results { + // Set the items in our internal state + let items_to_modify = monitored_item_ids.iter() + .zip(results.iter()) + .map(|(id, r)| { + subscription::ModifyMonitoredItem { + id: *id, + queue_size: r.revised_queue_size, + sampling_interval: r.revised_sampling_interval, + } + }) + .collect::>(); + { + let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); + subscription_state.modify_monitored_items(subscription_id, &items_to_modify); + } + } + debug!("modify_monitored_items, success"); + Ok(response.results.unwrap()) + } else { + error!("modify_monitored_items failed {:?}", response); + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Sets the monitoring mode on one or more monitored items by sending a [`SetMonitoringModeRequest`] + /// to the server. + /// + /// See OPC UA Part 4 - Services 5.12.4 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `subscription_id` - the subscription identifier containing the monitored items to be modified. + /// * `monitoring_mode` - the monitored mode to apply to the monitored items + /// * `monitored_item_ids` - the monitored items to be modified + /// + /// # Returns + /// + /// * `Ok(Vec)` - Individual result for each monitored item. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + /// [`SetMonitoringModeRequest`]: ./struct.SetMonitoringModeRequest.html + /// + pub fn set_monitoring_mode(&mut self, subscription_id: u32, monitoring_mode: MonitoringMode, monitored_item_ids: &[u32]) -> Result, StatusCode> { + if monitored_item_ids.is_empty() { + error!("set_monitoring_mode, called with nothing to do"); + Err(StatusCode::BadNothingToDo) + } else { + let request = { + let monitored_item_ids = Some(monitored_item_ids.to_vec()); + SetMonitoringModeRequest { + request_header: self.make_request_header(), + subscription_id, + monitoring_mode, + monitored_item_ids, + } + }; + let response = self.send_request(request)?; + if let SupportedMessage::SetMonitoringModeResponse(response) = response { + Ok(response.results.unwrap()) + } else { + error!("set_monitoring_mode failed {:?}", response); + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Sets a monitored item so it becomes the trigger that causes other monitored items to send + /// change events in the same update. Sends a [`SetTriggeringRequest`] to the server. + /// Note that `items_to_remove` is applied before `items_to_add`. + /// + /// See OPC UA Part 4 - Services 5.12.5 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `subscription_id` - the subscription identifier containing the monitored item to be used as the trigger. + /// * `monitored_item_id` - the monitored item that is the trigger. + /// * `links_to_add` - zero or more items to be added to the monitored item's triggering list. + /// * `items_to_remove` - zero or more items to be removed from the monitored item's triggering list. + /// + /// # Returns + /// + /// * `Ok((Option>, Option>))` - Individual result for each item added / removed for the SetTriggering call. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + /// [`SetTriggeringRequest`]: ./struct.SetTriggeringRequest.html + /// + pub fn set_triggering(&mut self, subscription_id: u32, triggering_item_id: u32, links_to_add: &[u32], links_to_remove: &[u32]) -> Result<(Option>, Option>), StatusCode> { + if links_to_add.is_empty() && links_to_remove.is_empty() { + error!("set_triggering, called with nothing to add or remove"); + Err(StatusCode::BadNothingToDo) + } else { + let request = { + let links_to_add = if links_to_add.is_empty() { None } else { Some(links_to_add.to_vec()) }; + let links_to_remove = if links_to_remove.is_empty() { None } else { Some(links_to_remove.to_vec()) }; + SetTriggeringRequest { + request_header: self.make_request_header(), + subscription_id, + triggering_item_id, + links_to_add, + links_to_remove, + } + }; + let response = self.send_request(request)?; + if let SupportedMessage::SetTriggeringResponse(response) = response { + // Update client side state + let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); + subscription_state.set_triggering(subscription_id, triggering_item_id, links_to_add, links_to_remove); + Ok((response.add_results, response.remove_results)) + } else { + error!("set_triggering failed {:?}", response); + Err(crate::process_unexpected_response(response)) + } + } + } + + /// Deletes monitored items from a subscription by sending a [`DeleteMonitoredItemsRequest`] to the server. + /// + /// See OPC UA Part 4 - Services 5.12.6 for complete description of the service and error responses. + /// + /// # Arguments + /// + /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem. + /// * `items_to_delete` - List of Server-assigned ids for the MonitoredItems to be deleted. + /// + /// # Returns + /// + /// * `Ok(Vec)` - List of StatusCodes for the MonitoredItems to delete. The size and + /// order of the list matches the size and order of the `items_to_delete` request parameter. + /// * `Err(StatusCode)` - Status code reason for failure. + /// + /// [`DeleteMonitoredItemsRequest`]: ./struct.DeleteMonitoredItemsRequest.html + /// + pub fn delete_monitored_items(&mut self, subscription_id: u32, items_to_delete: &[u32]) -> Result, StatusCode> { + debug!("delete_monitored_items, subscription {} for {} items", subscription_id, items_to_delete.len()); + if subscription_id == 0 { + error!("delete_monitored_items, subscription id 0 is invalid"); + Err(StatusCode::BadInvalidArgument) + } else if !self.subscription_exists(subscription_id) { + error!("delete_monitored_items, subscription id {} does not exist", subscription_id); + Err(StatusCode::BadInvalidArgument) + } else if items_to_delete.is_empty() { + error!("delete_monitored_items, called with no items to delete"); + Err(StatusCode::BadNothingToDo) + } else { + let request = DeleteMonitoredItemsRequest { + request_header: self.make_request_header(), + subscription_id, + monitored_item_ids: Some(items_to_delete.to_vec()), + }; + let response = self.send_request(request)?; + if let SupportedMessage::DeleteMonitoredItemsResponse(response) = response { + crate::process_service_result(&response.response_header)?; + if response.results.is_some() { + let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); + subscription_state.delete_monitored_items(subscription_id, items_to_delete); + } + debug!("delete_monitored_items, success"); Ok(response.results.unwrap()) } else { + error!("delete_monitored_items failed {:?}", response); Err(crate::process_unexpected_response(response)) } } @@ -1165,6 +1697,8 @@ impl Session { /// Create a subscription by sending a [`CreateSubscriptionRequest`] to the server. /// + /// See OPC UA Part 4 - Services 5.13.2 for complete description of the service and error responses. + /// /// # Arguments /// /// * `publishing_interval` - The requested publishing interval defines the cyclic rate that @@ -1255,6 +1789,8 @@ impl Session { /// Modifies a subscription by sending a [`ModifySubscriptionRequest`] to the server. /// + /// See OPC UA Part 4 - Services 5.13.3 for complete description of the service and error responses. + /// /// # Arguments /// /// * `subscription_id` - subscription identifier returned from `create_subscription`. @@ -1304,120 +1840,72 @@ impl Session { } } - /// Deletes a subscription by sending a [`DeleteSubscriptionsRequest`] to the server. - /// - /// # Arguments - /// - /// * `subscription_id` - subscription identifier returned from `create_subscription`. - /// - /// # Returns + /// Changes the publishing mode of subscriptions by sending a [`SetPublishingModeRequest`] to the server. /// - /// * `Ok(StatusCode)` - Service return code for the delete action, `Good` or `BadSubscriptionIdInvalid` - /// * `Err(StatusCode)` - Status code reason for failure - /// - /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html - /// - pub fn delete_subscription(&mut self, subscription_id: u32) -> Result { - if subscription_id == 0 { - error!("delete_subscription, subscription id 0 is invalid"); - Err(StatusCode::BadInvalidArgument) - } else if !self.subscription_exists(subscription_id) { - error!("delete_subscription, subscription id {} does not exist", subscription_id); - Err(StatusCode::BadInvalidArgument) - } else { - let result = self.delete_subscriptions(&[subscription_id][..])?; - Ok(result[0]) - } - } - - /// Deletes subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with the list - /// of subscriptions to delete. + /// See OPC UA Part 4 - Services 5.13.4 for complete description of the service and error responses. /// /// # Arguments /// - /// * `subscription_ids` - List of subscription identifiers to delete. + /// * `subscription_ids` - one or more subscription identifiers. + /// * `publishing_enabled` - A boolean parameter with the following values - `true` publishing + /// is enabled for the Subscriptions, `false`, publishing is disabled for the Subscriptions. /// /// # Returns /// - /// * `Ok(Vec)` - List of result for delete action on each id, `Good` or `BadSubscriptionIdInvalid` - /// The size and order of the list matches the size and order of the input. + /// * `Ok(Vec)` - Service return code for the action for each id, `Good` or `BadSubscriptionIdInvalid` /// * `Err(StatusCode)` - Status code reason for failure /// - /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html + /// [`SetPublishingModeRequest`]: ./struct.SetPublishingModeRequest.html /// - pub fn delete_subscriptions(&mut self, subscription_ids: &[u32]) -> Result, StatusCode> { + pub fn set_publishing_mode(&mut self, subscription_ids: &[u32], publishing_enabled: bool) -> Result, StatusCode> { + debug!("set_publishing_mode, for subscriptions {:?}, publishing enabled {}", subscription_ids, publishing_enabled); if subscription_ids.is_empty() { // No subscriptions - trace!("delete_subscriptions with no subscriptions"); + error!("set_publishing_mode, no subscription ids were provided"); Err(StatusCode::BadNothingToDo) } else { - // Send a delete request holding all the subscription ides that we wish to delete - let request = DeleteSubscriptionsRequest { + let request = SetPublishingModeRequest { request_header: self.make_request_header(), + publishing_enabled, subscription_ids: Some(subscription_ids.to_vec()), }; let response = self.send_request(request)?; - if let SupportedMessage::DeleteSubscriptionsResponse(response) = response { + if let SupportedMessage::SetPublishingModeResponse(response) = response { crate::process_service_result(&response.response_header)?; { - // Clear out deleted subscriptions, assuming the delete worked + // Clear out all subscriptions, assuming the delete worked let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); - subscription_ids.iter().for_each(|id| { - let _ = subscription_state.delete_subscription(*id); - }); + subscription_state.set_publishing_mode(subscription_ids, publishing_enabled); } - debug!("delete_subscriptions success"); + debug!("set_publishing_mode success"); Ok(response.results.unwrap()) } else { - error!("delete_subscriptions failed {:?}", response); + error!("set_publishing_mode failed {:?}", response); Err(crate::process_unexpected_response(response)) } } } - /// Deletes all subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with - /// ids for all subscriptions. + /// Transfers Subscriptions and their MonitoredItems from one Session to another. For example, + /// a Client may need to reopen a Session and then transfer its Subscriptions to that Session. + /// It may also be used by one Client to take over a Subscription from another Client by + /// transferring the Subscription to its Session. + /// + /// See OPC UA Part 4 - Services 5.13.7 for complete description of the service and error responses. + /// + /// * `subscription_ids` - one or more subscription identifiers. + /// * `send_initial_values` - A boolean parameter with the following values - `true` the first + /// publish response shall contain the current values of all monitored items in the subscription, + /// `false`, the first publish response shall contain only the value changes since the last + /// publish response was sent. /// /// # Returns /// - /// * `Ok(Vec<(u32, StatusCode)>)` - List of (id, status code) result for delete action on each id, `Good` or `BadSubscriptionIdInvalid` - /// * `Err(StatusCode)` - Status code reason for failure - /// - /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html - /// - pub fn delete_all_subscriptions(&mut self) -> Result, StatusCode> { - let subscription_ids = { - let subscription_state = trace_read_lock_unwrap!(self.subscription_state); - subscription_state.subscription_ids() - }; - if let Some(ref subscription_ids) = subscription_ids { - let status_codes = self.delete_subscriptions(subscription_ids.as_slice())?; - // Return a list of (id, status_code) for each subscription - Ok(subscription_ids.iter().zip(status_codes).map(|(id, status_code)| (*id, status_code)).collect()) - } else { - // No subscriptions - trace!("delete_all_subscriptions, called when there are no subscriptions"); - Err(StatusCode::BadNothingToDo) - } - } - - /// Transfers Subscriptions and their MonitoredItems from one Session to another. For example, - /// a Client may need to reopen a Session and then transfer its Subscriptions to that Session. - /// It may also be used by one Client to take over a Subscription from another Client by - /// transferring the Subscription to its Session. - /// - /// * `subscription_ids` - one or more subscription identifiers. - /// * `send_initial_values` - A boolean parameter with the following values - `true` the first - /// publish response shall contain the current values of all monitored items in the subscription, - /// `false`, the first publish response shall contain only the value changes since the last - /// publish response was sent. - /// - /// # Returns - /// - /// * `Ok(Vec)` - Service return code for each transfer subscription. + /// * `Ok(Vec)` - The [`TransferResult`] for each transfer subscription. /// * `Err(StatusCode)` - Status code reason for failure /// /// [`TransferSubscriptionsRequest`]: ./struct.TransferSubscriptionsRequest.html + /// [`TransferResult`]: ./struct.TransferResult.html /// pub fn transfer_subscriptions(&mut self, subscription_ids: &[u32], send_initial_values: bool) -> Result, StatusCode> { if subscription_ids.is_empty() { @@ -1442,402 +1930,110 @@ impl Session { } } - /// Changes the publishing mode of subscriptiongs by sending a [`SetPublishingModeRequest`] to the server. - /// - /// # Arguments - /// - /// * `subscription_ids` - one or more subscription identifiers. - /// * `publishing_enabled` - A boolean parameter with the following values - `true` publishing - /// is enabled for the Subscriptions, `false`, publishing is disabled for the Subscriptions. - /// - /// # Returns - /// - /// * `Ok(Vec)` - Service return code for the action for each id, `Good` or `BadSubscriptionIdInvalid` - /// * `Err(StatusCode)` - Status code reason for failure - /// - /// [`SetPublishingModeRequest`]: ./struct.SetPublishingModeRequest.html + /// Deletes a subscription by sending a [`DeleteSubscriptionsRequest`] to the server. /// - pub fn set_publishing_mode(&mut self, subscription_ids: &[u32], publishing_enabled: bool) -> Result, StatusCode> { - debug!("set_publishing_mode, for subscriptions {:?}, publishing enabled {}", subscription_ids, publishing_enabled); - if subscription_ids.is_empty() { - // No subscriptions - error!("set_publishing_mode, no subscription ids were provided"); - Err(StatusCode::BadNothingToDo) - } else { - let request = SetPublishingModeRequest { - request_header: self.make_request_header(), - publishing_enabled, - subscription_ids: Some(subscription_ids.to_vec()), - }; - let response = self.send_request(request)?; - if let SupportedMessage::SetPublishingModeResponse(response) = response { - crate::process_service_result(&response.response_header)?; - { - // Clear out all subscriptions, assuming the delete worked - let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); - subscription_state.set_publishing_mode(subscription_ids, publishing_enabled); - } - debug!("set_publishing_mode success"); - Ok(response.results.unwrap()) - } else { - error!("set_publishing_mode failed {:?}", response); - Err(crate::process_unexpected_response(response)) - } - } - } - - /// Creates monitored items on a subscription by sending a [`CreateMonitoredItemsRequest`] to the server. + /// See OPC UA Part 4 - Services 5.13.8 for complete description of the service and error responses. /// /// # Arguments /// - /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem - /// * `timestamps_to_return` - An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem. - /// * `items_to_create` - A list of MonitoredItems to be created and assigned to the specified Subscription. + /// * `subscription_id` - subscription identifier returned from `create_subscription`. /// /// # Returns /// - /// * `Ok(Vec)` - List of results for the MonitoredItems to create. - /// The size and order of the list matches the size and order of the `items_to_create` request parameter. + /// * `Ok(StatusCode)` - Service return code for the delete action, `Good` or `BadSubscriptionIdInvalid` /// * `Err(StatusCode)` - Status code reason for failure /// - /// [`CreateMonitoredItemsRequest`]: ./struct.CreateMonitoredItemsRequest.html + /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html /// - pub fn create_monitored_items(&mut self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest]) -> Result, StatusCode> { - debug!("create_monitored_items, for subscription {}, {} items", subscription_id, items_to_create.len()); + pub fn delete_subscription(&mut self, subscription_id: u32) -> Result { if subscription_id == 0 { - error!("create_monitored_items, subscription id 0 is invalid"); + error!("delete_subscription, subscription id 0 is invalid"); Err(StatusCode::BadInvalidArgument) } else if !self.subscription_exists(subscription_id) { - error!("create_monitored_items, subscription id {} does not exist", subscription_id); + error!("delete_subscription, subscription id {} does not exist", subscription_id); Err(StatusCode::BadInvalidArgument) - } else if items_to_create.is_empty() { - error!("create_monitored_items, called with no items to create"); - Err(StatusCode::BadNothingToDo) } else { - // Assign each item a unique client handle - let mut items_to_create = items_to_create.to_vec(); - { - let mut session_state = trace_write_lock_unwrap!(self.session_state); - items_to_create.iter_mut().for_each(|i| { - i.requested_parameters.client_handle = session_state.next_monitored_item_handle(); - }); - } - - let request = CreateMonitoredItemsRequest { - request_header: self.make_request_header(), - subscription_id, - timestamps_to_return, - items_to_create: Some(items_to_create.clone()), - }; - let response = self.send_request(request)?; - if let SupportedMessage::CreateMonitoredItemsResponse(response) = response { - crate::process_service_result(&response.response_header)?; - if let Some(ref results) = response.results { - debug!("create_monitored_items, {} items created", items_to_create.len()); - // Set the items in our internal state - let items_to_create = items_to_create.iter() - .zip(results) - .map(|(i, r)| { - subscription::CreateMonitoredItem { - id: r.monitored_item_id, - client_handle: i.requested_parameters.client_handle, - discard_oldest: i.requested_parameters.discard_oldest, - item_to_monitor: i.item_to_monitor.clone(), - monitoring_mode: i.monitoring_mode, - queue_size: r.revised_queue_size, - sampling_interval: r.revised_sampling_interval, - } - }) - .collect::>(); - { - let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); - subscription_state.insert_monitored_items(subscription_id, &items_to_create); - } - } else { - debug!("create_monitored_items, success but no monitored items were created"); - } - Ok(response.results.unwrap()) - } else { - error!("create_monitored_items failed {:?}", response); - Err(crate::process_unexpected_response(response)) - } + let result = self.delete_subscriptions(&[subscription_id][..])?; + Ok(result[0]) } } - /// Modifies monitored items on a subscription by sending a [`ModifyMonitoredItemsRequest`] to the server. - /// - /// # Arguments - /// - /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem. - /// * `timestamps_to_return` - An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem. - /// * `items_to_modify` - The list of MonitoredItems to modify. - /// - /// # Returns - /// - /// * `Ok(Vec)` - List of results for the MonitoredItems to modify. - /// The size and order of the list matches the size and order of the `items_to_modify` request parameter. - /// * `Err(StatusCode)` - Status code reason for failure. + /// Deletes subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with the list + /// of subscriptions to delete. /// - /// [`ModifyMonitoredItemsRequest`]: ./struct.ModifyMonitoredItemsRequest.html - /// - pub fn modify_monitored_items(&mut self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_modify: &[MonitoredItemModifyRequest]) -> Result, StatusCode> { - debug!("modify_monitored_items, for subscription {}, {} items", subscription_id, items_to_modify.len()); - if subscription_id == 0 { - error!("modify_monitored_items, subscription id 0 is invalid"); - Err(StatusCode::BadInvalidArgument) - } else if !self.subscription_exists(subscription_id) { - error!("modify_monitored_items, subscription id {} does not exist", subscription_id); - Err(StatusCode::BadInvalidArgument) - } else if items_to_modify.is_empty() { - error!("modify_monitored_items, called with no items to modify"); - Err(StatusCode::BadNothingToDo) - } else { - let monitored_item_ids = items_to_modify.iter() - .map(|i| i.monitored_item_id) - .collect::>(); - let request = ModifyMonitoredItemsRequest { - request_header: self.make_request_header(), - subscription_id, - timestamps_to_return, - items_to_modify: Some(items_to_modify.to_vec()), - }; - let response = self.send_request(request)?; - if let SupportedMessage::ModifyMonitoredItemsResponse(response) = response { - crate::process_service_result(&response.response_header)?; - if let Some(ref results) = response.results { - // Set the items in our internal state - let items_to_modify = monitored_item_ids.iter() - .zip(results.iter()) - .map(|(id, r)| { - subscription::ModifyMonitoredItem { - id: *id, - queue_size: r.revised_queue_size, - sampling_interval: r.revised_sampling_interval, - } - }) - .collect::>(); - { - let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); - subscription_state.modify_monitored_items(subscription_id, &items_to_modify); - } - } - debug!("modify_monitored_items, success"); - Ok(response.results.unwrap()) - } else { - error!("modify_monitored_items failed {:?}", response); - Err(crate::process_unexpected_response(response)) - } - } - } - - /// Deletes monitored items from a subscription by sending a [`DeleteMonitoredItemsRequest`] to the server. + /// See OPC UA Part 4 - Services 5.13.8 for complete description of the service and error responses. /// /// # Arguments /// - /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem. - /// * `items_to_delete` - List of Server-assigned ids for the MonitoredItems to be deleted. + /// * `subscription_ids` - List of subscription identifiers to delete. /// /// # Returns /// - /// * `Ok(Vec)` - List of StatusCodes for the MonitoredItems to delete. The size and - /// order of the list matches the size and order of the `items_to_delete` request parameter. - /// * `Err(StatusCode)` - Status code reason for failure. + /// * `Ok(Vec)` - List of result for delete action on each id, `Good` or `BadSubscriptionIdInvalid` + /// The size and order of the list matches the size and order of the input. + /// * `Err(StatusCode)` - Status code reason for failure /// - /// [`DeleteMonitoredItemsRequest`]: ./struct.DeleteMonitoredItemsRequest.html + /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html /// - pub fn delete_monitored_items(&mut self, subscription_id: u32, items_to_delete: &[u32]) -> Result, StatusCode> { - debug!("delete_monitored_items, subscription {} for {} items", subscription_id, items_to_delete.len()); - if subscription_id == 0 { - error!("delete_monitored_items, subscription id 0 is invalid"); - Err(StatusCode::BadInvalidArgument) - } else if !self.subscription_exists(subscription_id) { - error!("delete_monitored_items, subscription id {} does not exist", subscription_id); - Err(StatusCode::BadInvalidArgument) - } else if items_to_delete.is_empty() { - error!("delete_monitored_items, called with no items to delete"); + pub fn delete_subscriptions(&mut self, subscription_ids: &[u32]) -> Result, StatusCode> { + if subscription_ids.is_empty() { + // No subscriptions + trace!("delete_subscriptions with no subscriptions"); Err(StatusCode::BadNothingToDo) } else { - let request = DeleteMonitoredItemsRequest { + // Send a delete request holding all the subscription ides that we wish to delete + let request = DeleteSubscriptionsRequest { request_header: self.make_request_header(), - subscription_id, - monitored_item_ids: Some(items_to_delete.to_vec()), + subscription_ids: Some(subscription_ids.to_vec()), }; let response = self.send_request(request)?; - if let SupportedMessage::DeleteMonitoredItemsResponse(response) = response { + if let SupportedMessage::DeleteSubscriptionsResponse(response) = response { crate::process_service_result(&response.response_header)?; - if response.results.is_some() { + { + // Clear out deleted subscriptions, assuming the delete worked let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); - subscription_state.delete_monitored_items(subscription_id, items_to_delete); - } - debug!("delete_monitored_items, success"); - Ok(response.results.unwrap()) - } else { - error!("delete_monitored_items failed {:?}", response); - Err(crate::process_unexpected_response(response)) - } - } - } - - /// Sets the monitoring mode on one or more monitored items by sending a [`SetMonitoringModeRequest`] - /// to the server. - /// - /// # Arguments - /// - /// * `subscription_id` - the subscription identifier containing the monitored items to be modified. - /// * `monitoring_mode` - the monitored mode to apply to the monitored items - /// * `monitored_item_ids` - the monitored items to be modified - /// - /// # Returns - /// - /// * `Ok(Vec)` - Individual result for each monitored item. - /// * `Err(StatusCode)` - Status code reason for failure. - /// - /// [`SetMonitoringModeRequest`]: ./struct.SetMonitoringModeRequest.html - /// - pub fn set_monitoring_mode(&mut self, subscription_id: u32, monitoring_mode: MonitoringMode, monitored_item_ids: &[u32]) -> Result, StatusCode> { - if monitored_item_ids.is_empty() { - error!("set_monitoring_mode, called with nothing to do"); - Err(StatusCode::BadNothingToDo) - } else { - let request = { - let monitored_item_ids = Some(monitored_item_ids.to_vec()); - SetMonitoringModeRequest { - request_header: self.make_request_header(), - subscription_id, - monitoring_mode, - monitored_item_ids, + subscription_ids.iter().for_each(|id| { + let _ = subscription_state.delete_subscription(*id); + }); } - }; - let response = self.send_request(request)?; - if let SupportedMessage::SetMonitoringModeResponse(response) = response { + debug!("delete_subscriptions success"); Ok(response.results.unwrap()) } else { - error!("set_monitoring_mode failed {:?}", response); - Err(crate::process_unexpected_response(response)) - } - } - } - - /// Sets a monitored item so it becomes the trigger that causes other monitored items to send - /// change events in the same update. Sends a [`SetTriggeringRequest`] to the server. - /// Note that `items_to_remove` is applied before `items_to_add`. - /// - /// # Arguments - /// - /// * `subscription_id` - the subscription identifier containing the monitored item to be used as the trigger. - /// * `monitored_item_id` - the monitored item that is the trigger. - /// * `links_to_add` - zero or more items to be added to the monitored item's triggering list. - /// * `items_to_remove` - zero or more items to be removed from the monitored item's triggering list. - /// - /// # Returns - /// - /// * `Ok((Option>, Option>))` - Individual result for each item added / removed for the SetTriggering call. - /// * `Err(StatusCode)` - Status code reason for failure. - /// - /// [`SetTriggeringRequest`]: ./struct.SetTriggeringRequest.html - /// - pub fn set_triggering(&mut self, subscription_id: u32, triggering_item_id: u32, links_to_add: &[u32], links_to_remove: &[u32]) -> Result<(Option>, Option>), StatusCode> { - if links_to_add.is_empty() && links_to_remove.is_empty() { - error!("set_triggering, called with nothing to add or remove"); - Err(StatusCode::BadNothingToDo) - } else { - let request = { - let links_to_add = if links_to_add.is_empty() { None } else { Some(links_to_add.to_vec()) }; - let links_to_remove = if links_to_remove.is_empty() { None } else { Some(links_to_remove.to_vec()) }; - SetTriggeringRequest { - request_header: self.make_request_header(), - subscription_id, - triggering_item_id, - links_to_add, - links_to_remove, - } - }; - let response = self.send_request(request)?; - if let SupportedMessage::SetTriggeringResponse(response) = response { - // Update client side state - let mut subscription_state = trace_write_lock_unwrap!(self.subscription_state); - subscription_state.set_triggering(subscription_id, triggering_item_id, links_to_add, links_to_remove); - Ok((response.add_results, response.remove_results)) - } else { - error!("set_triggering failed {:?}", response); + error!("delete_subscriptions failed {:?}", response); Err(crate::process_unexpected_response(response)) } } } - - /// Calls a single method on an object on the server by sending a [`CallRequest`] to the server. - /// - /// # Arguments - /// - /// * `method` - The method to call. Note this function takes anything that can be turned into - /// a [`CallMethodRequest`] which includes a (`NodeId`, `NodeId`, `Option>`) - /// which refers to the object id, method id, and input arguments respectively. - /// * `items_to_delete` - List of Server-assigned ids for the MonitoredItems to be deleted. + /// Deletes all subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with + /// ids for all subscriptions. /// /// # Returns /// - /// * `Ok(CallMethodResult)` - Result for the Method call. - /// * `Err(StatusCode)` - Status code reason for failure. + /// * `Ok(Vec<(u32, StatusCode)>)` - List of (id, status code) result for delete action on each id, `Good` or `BadSubscriptionIdInvalid` + /// * `Err(StatusCode)` - Status code reason for failure /// - /// [`CallRequest`]: ./struct.CallRequest.html - /// [`CallMethodRequest`]: ./struct.CallMethodRequest.html + /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html /// - pub fn call_method(&mut self, method: T) -> Result where T: Into { - debug!("call_method"); - let methods_to_call = Some(vec![method.into()]); - let request = CallRequest { - request_header: self.make_request_header(), - methods_to_call, + pub fn delete_all_subscriptions(&mut self) -> Result, StatusCode> { + let subscription_ids = { + let subscription_state = trace_read_lock_unwrap!(self.subscription_state); + subscription_state.subscription_ids() }; - let response = self.send_request(request)?; - if let SupportedMessage::CallResponse(response) = response { - if let Some(mut results) = response.results { - if results.len() != 1 { - error!("call_method, expecting a result from the call to the server, got {} results", results.len()); - Err(StatusCode::BadUnexpectedError) - } else { - Ok(results.remove(0)) - } - } else { - error!("call_method, expecting a result from the call to the server, got nothing"); - Err(StatusCode::BadUnexpectedError) - } + if let Some(ref subscription_ids) = subscription_ids { + let status_codes = self.delete_subscriptions(subscription_ids.as_slice())?; + // Return a list of (id, status_code) for each subscription + Ok(subscription_ids.iter().zip(status_codes).map(|(id, status_code)| (*id, status_code)).collect()) } else { - Err(crate::process_unexpected_response(response)) + // No subscriptions + trace!("delete_all_subscriptions, called when there are no subscriptions"); + Err(StatusCode::BadNothingToDo) } } - /// Calls GetMonitoredItems via call_method(), putting a sane interface on the input / output. - /// - /// # Arguments - /// - /// * `subscription_id` - Server allocated identifier for the subscription to return monitored items for. - /// - /// # Returns - /// - /// * `Ok((Vec, Vec))` - Result for call, consisting a list of (monitored_item_id, client_handle) - /// * `Err(StatusCode)` - Status code reason for failure. - /// - pub fn call_get_monitored_items(&mut self, subscription_id: u32) -> Result<(Vec, Vec), StatusCode> { - let args = Some(vec![Variant::from(subscription_id)]); - let object_id: NodeId = ObjectId::Server.into(); - let method_id: NodeId = MethodId::Server_GetMonitoredItems.into(); - let request: CallMethodRequest = (object_id, method_id, args).into(); - let response = self.call_method(request)?; - if let Some(mut result) = response.output_arguments { - if result.len() == 2 { - let server_handles = result.remove(0).as_u32_array().map_err(|_| StatusCode::BadUnexpectedError)?; - let client_handles = result.remove(0).as_u32_array().map_err(|_| StatusCode::BadUnexpectedError)?; - Ok((server_handles, client_handles)) - } else { - error!("Expected a result with 2 args and didn't get it."); - Err(StatusCode::BadUnexpectedError) - } - } else { - error!("Expected a result and didn't get it."); - Err(StatusCode::BadUnexpectedError) - } + /// Returns the subscription state object + pub fn subscription_state(&self) -> Arc> { + self.subscription_state.clone() } // Test if the subscription by id exists diff --git a/client/src/session_state.rs b/client/src/session_state.rs index c9014d3c0..7fb07e62e 100644 --- a/client/src/session_state.rs +++ b/client/src/session_state.rs @@ -191,7 +191,6 @@ impl SessionState { } /// Sends a publish request containing acknowledgements for previous notifications. - /// TODO this function needs to be refactored as an asynchronous operation. pub fn async_publish(&mut self, subscription_acknowledgements: &[SubscriptionAcknowledgement]) -> Result { debug!("async_publish with {} subscription acknowledgements", subscription_acknowledgements.len()); let request = PublishRequest { diff --git a/core/src/crypto/thumbprint.rs b/core/src/crypto/thumbprint.rs index e66833a5c..df38f256b 100644 --- a/core/src/crypto/thumbprint.rs +++ b/core/src/crypto/thumbprint.rs @@ -22,7 +22,7 @@ impl Thumbprint { /// Constructs a thumbprint from a message digest which is expected to be the proper length pub fn new(digest: &[u8]) -> Thumbprint { if digest.len() != Thumbprint::THUMBPRINT_SIZE { - panic!("Thumbprint is not the right length"); + panic!("Thumbprint is the wrong length, {}", digest.len()); } let mut value = [0u8; Thumbprint::THUMBPRINT_SIZE]; value.clone_from_slice(digest); diff --git a/samples/discovery-client/Cargo.toml b/samples/discovery-client/Cargo.toml index b76c0e581..99427f446 100644 --- a/samples/discovery-client/Cargo.toml +++ b/samples/discovery-client/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Adam Lock "] edition = "2018" [dependencies] -clap = "2.27" +clap = "2.33" [dependencies.opcua-types] path = "../../types" diff --git a/samples/gfx-client/Cargo.toml b/samples/gfx-client/Cargo.toml index 2c3a27a92..a193a5557 100644 --- a/samples/gfx-client/Cargo.toml +++ b/samples/gfx-client/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Adam Lock "] edition = "2018" [dependencies] -clap = "2.27" +clap = "2.33" conrod = { version = "0.61", features = ["winit", "glium"] } [dependencies.opcua-client] diff --git a/samples/mqtt-client/Cargo.toml b/samples/mqtt-client/Cargo.toml index c015faed9..79da77476 100644 --- a/samples/mqtt-client/Cargo.toml +++ b/samples/mqtt-client/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Adam Lock "] edition = "2018" [dependencies] -clap = "2.27" +clap = "2.33" # This is a completely arbitrary snapshot of rumqtt that happens to work rumqtt = { git = "https://github.com/AtherEnergy/rumqtt.git", rev = "83b4694525061e2ccef617c0ac867db2044cc4e7" } diff --git a/samples/simple-client/Cargo.toml b/samples/simple-client/Cargo.toml index 1d8f66bc5..081c970c6 100644 --- a/samples/simple-client/Cargo.toml +++ b/samples/simple-client/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Adam Lock "] edition = "2018" [dependencies] -clap = "2.27" +clap = "2.33" [dependencies.opcua-client] path = "../../client" diff --git a/server/src/callbacks.rs b/server/src/callbacks.rs new file mode 100644 index 000000000..ec6752a84 --- /dev/null +++ b/server/src/callbacks.rs @@ -0,0 +1,31 @@ +//! Callbacks that a server implementation may register with the library + +use std::sync::{Arc, RwLock}; + +use opcua_types::{ + NodeId, + status_code::StatusCode, +}; + +use crate::session::Session; + +/// Called by RegisterNodes service +pub trait OnRegisterNodes { + /// Called when a client calls the RegisterNodes service. This implementation should return a list + /// of the same size and order containing node ids corresponding to the input, or aliases. The implementation + /// should return `BadNodeIdInvalid` if any of the node ids in the input are invalid. + /// + /// The call is also given the session that the request was made on. The implementation should + /// NOT hold a strong reference to this session, but it can make a weak reference if it desires. + /// + /// There is no guarantee that the corresponding `OnUnregisterNodes` will be called by the client, + /// therefore use the weak session references and a periodic check to perform any housekeeping. + fn on_register_nodes(&mut self, session: Arc>, nodes_to_register: &[NodeId]) -> Result, StatusCode>; +} + +/// Called by UnregisterNodes service +pub trait OnUnregisterNodes { + /// Called when a client calls the UnregisterNodes service. See `OnRegisterNodes` trait for more + /// information. + fn on_unregister_nodes(&mut self, session: Arc>, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode>; +} diff --git a/server/src/lib.rs b/server/src/lib.rs index de41c3029..a78694810 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -88,6 +88,7 @@ pub mod util; pub mod continuation_point; #[cfg(feature = "http")] pub mod http; +pub mod callbacks; pub mod prelude { //! Provides a way to use most types and functions commonly used by server implementations from a @@ -96,10 +97,11 @@ pub mod prelude { pub use opcua_types::service_types::*; pub use opcua_core::prelude::*; pub use crate::{ + address_space::types::*, + builder::*, + callbacks::*, config::*, server::*, - builder::*, - address_space::types::*, subscriptions::*, util::*, }; diff --git a/server/src/server.rs b/server/src/server.rs index db8936cad..44145d954 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -135,6 +135,8 @@ impl Server { max_browse_paths_per_translate: constants::MAX_BROWSE_PATHS_PER_TRANSLATE, diagnostics, abort: false, + on_register_nodes_callback: None, + on_unregister_nodes_callback: None, }; let server_state = Arc::new(RwLock::new(server_state)); diff --git a/server/src/session.rs b/server/src/session.rs index 2be595810..a82e96d1b 100644 --- a/server/src/session.rs +++ b/server/src/session.rs @@ -123,6 +123,7 @@ impl Session { session } + /// Create a `Session` from a `Server` pub fn new(server: &Server) -> Session { let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS; diff --git a/server/src/state.rs b/server/src/state.rs index 79ffbe4b8..bf06f7d31 100644 --- a/server/src/state.rs +++ b/server/src/state.rs @@ -3,14 +3,21 @@ use std::sync::{Arc, RwLock}; use opcua_core::prelude::*; -use opcua_types::node_ids::ObjectId; -use opcua_types::profiles; -use opcua_types::service_types::{ApplicationDescription, RegisteredServer, ApplicationType, EndpointDescription, UserNameIdentityToken, UserTokenPolicy, UserTokenType, X509IdentityToken}; -use opcua_types::service_types::ServerState as ServerStateType; -use opcua_types::status_code::StatusCode; + +use opcua_types::{ + node_ids::ObjectId, + profiles, + service_types::{ + ApplicationDescription, RegisteredServer, ApplicationType, EndpointDescription, + UserNameIdentityToken, UserTokenPolicy, UserTokenType, X509IdentityToken, + ServerState as ServerStateType, + }, + status_code::StatusCode, +}; use crate::config::{ServerConfig, ServerEndpoint}; use crate::diagnostics::ServerDiagnostics; +use crate::callbacks::*; const TOKEN_POLICY_ANONYMOUS: &str = "anonymous"; const TOKEN_POLICY_USER_PASS_PLAINTEXT: &str = "userpass_plaintext"; @@ -63,6 +70,11 @@ pub struct ServerState { pub abort: bool, /// Diagnostic information pub diagnostics: Arc>, + /// Callback for register nodes + pub(crate) on_register_nodes_callback: Option>, + /// Callback for unregister nodes + pub(crate) on_unregister_nodes_callback: Option>, + } impl ServerState { @@ -305,6 +317,11 @@ impl ServerState { } } + pub fn set_register_nodes_callbacks(&mut self, register_nodes_callback: Box, unregister_nodes_callback: Box) { + self.on_register_nodes_callback = Some(register_nodes_callback); + self.on_unregister_nodes_callback = Some(unregister_nodes_callback); + } + /// Authenticates an anonymous token, i.e. does the endpoint support anonymous access or not fn authenticate_anonymous_token(endpoint: &ServerEndpoint) -> StatusCode { if endpoint.supports_anonymous() { diff --git a/server/src/subscriptions/subscription.rs b/server/src/subscriptions/subscription.rs index 4292f3315..051aab660 100644 --- a/server/src/subscriptions/subscription.rs +++ b/server/src/subscriptions/subscription.rs @@ -95,7 +95,6 @@ impl UpdateStateResult { #[derive(Debug, Copy, Clone, PartialEq)] pub(crate) enum TickReason { ReceivePublishRequest, - // PublishingTimerExpires, TickTimerFired, } diff --git a/tools/certificate-creator/Cargo.toml b/tools/certificate-creator/Cargo.toml index 875b09a41..9e45f76c9 100644 --- a/tools/certificate-creator/Cargo.toml +++ b/tools/certificate-creator/Cargo.toml @@ -10,7 +10,7 @@ categories = ["embedded","network-programming"] edition = "2018" [dependencies] -clap = "2.27" +clap = "2.33" [dependencies.opcua-types] path = "../../types" diff --git a/types/src/lib.rs b/types/src/lib.rs index 771adc06a..2b1c56112 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -21,7 +21,7 @@ macro_rules! supported_message_as { if let SupportedMessage::$i(value) = $v { *value } else { - panic!("Failed to get a supported message of type {}", stringify!($i)); + panic!(); } } }