From 2e26b1914ad13c3d59e0a04828b6f5bdadf84911 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Thu, 5 Aug 2021 11:43:45 +0100 Subject: [PATCH] Change most of Session functions to take immutable &self --- client/src/client.rs | 6 +- client/src/comms/tcp_transport.rs | 4 +- client/src/session/services.rs | 84 ++++++++-------- client/src/session/session.rs | 153 +++++++++++++++--------------- integration/src/harness.rs | 4 +- samples/event-client/src/main.rs | 2 +- samples/mqtt-client/src/main.rs | 2 +- samples/simple-client/src/main.rs | 2 +- samples/web-client/src/main.rs | 8 +- 9 files changed, 130 insertions(+), 135 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index b7a0367e9..729383673 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -464,7 +464,7 @@ impl Client { user_identity_token: IdentityToken::Anonymous, preferred_locales, }; - let mut session = Session::new( + let session = Session::new( self.application_description(), self.config.session_name.clone(), self.certificate_store.clone(), @@ -497,7 +497,7 @@ impl Client { let endpoint = EndpointDescription::from(discovery_endpoint_url.as_ref()); let session = self.new_session_from_info(endpoint); if let Ok(session) = session { - let mut session = trace_write_lock_unwrap!(session); + let session = trace_read_lock_unwrap!(session); // Connect & activate the session. let connected = session.connect(); if connected.is_ok() { @@ -578,7 +578,7 @@ impl Client { ); let session = self.new_session_from_info(endpoint.clone()); if let Ok(session) = session { - let mut session = trace_write_lock_unwrap!(session); + let session = trace_read_lock_unwrap!(session); match session.connect() { Ok(_) => { // Register with the server diff --git a/client/src/comms/tcp_transport.rs b/client/src/comms/tcp_transport.rs index a7b49e2c2..d33e27960 100644 --- a/client/src/comms/tcp_transport.rs +++ b/client/src/comms/tcp_transport.rs @@ -276,7 +276,7 @@ impl TcpTransport { } /// Connects the stream to the specified endpoint - pub fn connect(&mut self, endpoint_url: &str) -> Result<(), StatusCode> { + pub fn connect(&self, endpoint_url: &str) -> Result<(), StatusCode> { if self.is_connected() { panic!("Should not try to connect when already connected"); } @@ -359,7 +359,7 @@ impl TcpTransport { } /// Disconnects the stream from the server (if it is connected) - pub fn wait_for_disconnect(&mut self) { + pub fn wait_for_disconnect(&self) { debug!("Waiting for a disconnect"); loop { if self.connection_state.is_finished() { diff --git a/client/src/session/services.rs b/client/src/session/services.rs index 54d2ebed5..d02e00858 100644 --- a/client/src/session/services.rs +++ b/client/src/session/services.rs @@ -32,15 +32,15 @@ pub enum HistoryUpdateAction { } pub trait Service { - fn make_request_header(&mut self) -> RequestHeader; + fn make_request_header(&self) -> RequestHeader; /// Synchronously sends a request. The return value is the response to the request - fn send_request(&mut self, request: T) -> Result + fn send_request(&self, request: T) -> Result where T: Into; /// Asynchronously sends a request. The return value is the request handle of the request - fn async_send_request(&mut self, request: T, is_async: bool) -> Result + fn async_send_request(&self, request: T, is_async: bool) -> Result where T: Into; } @@ -63,10 +63,7 @@ pub trait DiscoveryService: Service { /// [`FindServersRequest`]: ./struct.FindServersRequest.html /// [`ApplicationDescription`]: ./struct.ApplicationDescription.html /// - fn find_servers( - &mut self, - endpoint_url: T, - ) -> Result, StatusCode> + fn find_servers(&self, endpoint_url: T) -> Result, StatusCode> where T: Into; @@ -81,7 +78,7 @@ pub trait DiscoveryService: Service { /// /// [`GetEndpointsRequest`]: ./struct.GetEndpointsRequest.html /// - fn get_endpoints(&mut self) -> Result, StatusCode>; + fn get_endpoints(&self) -> Result, StatusCode>; /// This function is used by servers that wish to register themselves with a discovery server. /// i.e. one server is the client to another server. The server sends a [`RegisterServerRequest`] @@ -101,7 +98,7 @@ pub trait DiscoveryService: Service { /// /// [`RegisterServerRequest`]: ./struct.RegisterServerRequest.html /// - fn register_server(&mut self, server: RegisteredServer) -> Result<(), StatusCode>; + fn register_server(&self, server: RegisteredServer) -> Result<(), StatusCode>; } /// SecureChannel Service set @@ -117,7 +114,7 @@ pub trait SecureChannelService: Service { /// /// [`OpenSecureChannelRequest`]: ./struct.OpenSecureChannelRequest.html /// - fn open_secure_channel(&mut self) -> Result<(), StatusCode>; + fn open_secure_channel(&self) -> Result<(), StatusCode>; /// Sends a [`CloseSecureChannelRequest`] to the server which will cause the server to drop /// the connection. @@ -131,7 +128,7 @@ pub trait SecureChannelService: Service { /// /// [`CloseSecureChannelRequest`]: ./struct.CloseSecureChannelRequest.html /// - fn close_secure_channel(&mut self) -> Result<(), StatusCode>; + fn close_secure_channel(&self) -> Result<(), StatusCode>; } /// Session Service set @@ -149,7 +146,7 @@ pub trait SessionService: Service { /// /// [`CreateSessionRequest`]: ./struct.CreateSessionRequest.html /// - fn create_session(&mut self) -> Result; + fn create_session(&self) -> Result; /// Sends an [`ActivateSessionRequest`] to the server to activate this session /// @@ -162,7 +159,7 @@ pub trait SessionService: Service { /// /// [`ActivateSessionRequest`]: ./struct.ActivateSessionRequest.html /// - fn activate_session(&mut self) -> Result<(), StatusCode>; + fn activate_session(&self) -> Result<(), StatusCode>; /// Cancels an outstanding service request by sending a [`CancelRequest`] to the server. /// @@ -179,7 +176,7 @@ pub trait SessionService: Service { /// /// [`CancelRequest`]: ./struct.CancelRequest.html /// - fn cancel(&mut self, request_handle: IntegerId) -> Result; + fn cancel(&self, request_handle: IntegerId) -> Result; } /// NodeManagement Service set @@ -202,10 +199,7 @@ pub trait NodeManagementService: Service { /// [`AddNodesItem`]: ./struct.AddNodesItem.html /// [`AddNodesResult`]: ./struct.AddNodesResult.html /// - fn add_nodes( - &mut self, - nodes_to_add: &[AddNodesItem], - ) -> Result, StatusCode>; + fn add_nodes(&self, nodes_to_add: &[AddNodesItem]) -> Result, StatusCode>; /// Add references by sending a [`AddReferencesRequest`] to the server. /// @@ -224,7 +218,7 @@ pub trait NodeManagementService: Service { /// [`AddReferencesItem`]: ./struct.AddReferencesItem.html /// fn add_references( - &mut self, + &self, references_to_add: &[AddReferencesItem], ) -> Result, StatusCode>; @@ -245,7 +239,7 @@ pub trait NodeManagementService: Service { /// [`DeleteNodesItem`]: ./struct.DeleteNodesItem.html /// fn delete_nodes( - &mut self, + &self, nodes_to_delete: &[DeleteNodesItem], ) -> Result, StatusCode>; @@ -266,7 +260,7 @@ pub trait NodeManagementService: Service { /// [`DeleteReferencesItem`]: ./struct.DeleteReferencesItem.html /// fn delete_references( - &mut self, + &self, references_to_delete: &[DeleteReferencesItem], ) -> Result, StatusCode>; } @@ -292,7 +286,7 @@ pub trait ViewService: Service { /// [`BrowseResult`]: ./struct.BrowseResult.html /// fn browse( - &mut self, + &self, nodes_to_browse: &[BrowseDescription], ) -> Result>, StatusCode>; @@ -317,7 +311,7 @@ pub trait ViewService: Service { /// [`BrowseResult`]: ./struct.BrowseResult.html /// fn browse_next( - &mut self, + &self, release_continuation_points: bool, continuation_points: &[ByteString], ) -> Result>, StatusCode>; @@ -344,7 +338,7 @@ pub trait ViewService: Service { /// [`BrowsePath`]: ./struct.BrowsePath.html /// [`BrowsePathResult`]: ./struct.BrowsePathResult.html fn translate_browse_paths_to_node_ids( - &mut self, + &self, browse_paths: &[BrowsePath], ) -> Result, StatusCode>; @@ -366,7 +360,7 @@ pub trait ViewService: Service { /// /// [`RegisterNodesRequest`]: ./struct.RegisterNodesRequest.html /// [`NodeId`]: ./struct.NodeId.html - fn register_nodes(&mut self, nodes_to_register: &[NodeId]) -> Result, StatusCode>; + fn register_nodes(&self, nodes_to_register: &[NodeId]) -> Result, StatusCode>; /// Unregister nodes on the server by sending a [`UnregisterNodesRequest`]. This indicates to /// the server that the client relinquishes any need for these nodes. The server will ignore @@ -386,7 +380,7 @@ pub trait ViewService: Service { /// [`UnregisterNodesRequest`]: ./struct.UnregisterNodesRequest.html /// [`NodeId`]: ./struct.NodeId.html /// - fn unregister_nodes(&mut self, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode>; + fn unregister_nodes(&self, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode>; } /// Attribute Service set @@ -408,7 +402,7 @@ pub trait AttributeService: Service { /// [`ReadValueId`]: ./struct.ReadValueId.html /// [`DataValue`]: ./struct.DataValue.html /// - fn read(&mut self, nodes_to_read: &[ReadValueId]) -> Result, StatusCode>; + fn read(&self, nodes_to_read: &[ReadValueId]) -> Result, StatusCode>; /// Reads historical values or events of one or more nodes. The caller is expected to provide /// a HistoryReadAction enum which must be one of the following: @@ -433,7 +427,7 @@ pub trait AttributeService: Service { /// * `Err(StatusCode)` - Status code reason for failure. /// fn history_read( - &mut self, + &self, history_read_details: HistoryReadAction, timestamps_to_return: TimestampsToReturn, release_continuation_points: bool, @@ -457,7 +451,7 @@ pub trait AttributeService: Service { /// [`WriteRequest`]: ./struct.WriteRequest.html /// [`WriteValue`]: ./struct.WriteValue.html /// - fn write(&mut self, nodes_to_write: &[WriteValue]) -> Result, StatusCode>; + fn write(&self, nodes_to_write: &[WriteValue]) -> Result, StatusCode>; /// Updates historical values. The caller is expected to provide one or more history update operations /// in a slice of HistoryUpdateAction enums which are one of the following: @@ -481,7 +475,7 @@ pub trait AttributeService: Service { /// * `Err(StatusCode)` - Status code reason for failure. /// fn history_update( - &mut self, + &self, history_update_details: &[HistoryUpdateAction], ) -> Result, StatusCode>; } @@ -507,7 +501,7 @@ pub trait MethodService: Service { /// [`CallMethodRequest`]: ./struct.CallMethodRequest.html /// [`CallMethodResult`]: ./struct.CallMethodResult.html /// - fn call(&mut self, method: T) -> Result + fn call(&self, method: T) -> Result where T: Into; @@ -523,7 +517,7 @@ pub trait MethodService: Service { /// * `Err(StatusCode)` - Status code reason for failure. /// fn call_get_monitored_items( - &mut self, + &self, subscription_id: u32, ) -> Result<(Vec, Vec), StatusCode> { let args = Some(vec![Variant::from(subscription_id)]); @@ -574,7 +568,7 @@ pub trait MonitoredItemService: Service { /// [`MonitoredItemCreateResult`]: ./struct.MonitoredItemCreateResult.html /// fn create_monitored_items( - &mut self, + &self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest], @@ -601,7 +595,7 @@ pub trait MonitoredItemService: Service { /// [`MonitoredItemModifyResult`]: ./struct.MonitoredItemModifyResult.html /// fn modify_monitored_items( - &mut self, + &self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_modify: &[MonitoredItemModifyRequest], @@ -626,7 +620,7 @@ pub trait MonitoredItemService: Service { /// [`SetMonitoringModeRequest`]: ./struct.SetMonitoringModeRequest.html /// fn set_monitoring_mode( - &mut self, + &self, subscription_id: u32, monitoring_mode: MonitoringMode, monitored_item_ids: &[u32], @@ -653,7 +647,7 @@ pub trait MonitoredItemService: Service { /// [`SetTriggeringRequest`]: ./struct.SetTriggeringRequest.html /// fn set_triggering( - &mut self, + &self, subscription_id: u32, triggering_item_id: u32, links_to_add: &[u32], @@ -678,7 +672,7 @@ pub trait MonitoredItemService: Service { /// [`DeleteMonitoredItemsRequest`]: ./struct.DeleteMonitoredItemsRequest.html /// fn delete_monitored_items( - &mut self, + &self, subscription_id: u32, items_to_delete: &[u32], ) -> Result, StatusCode>; @@ -732,7 +726,7 @@ pub trait SubscriptionService: Service { /// [`CreateSubscriptionRequest`]: ./struct.CreateSubscriptionRequest.html /// fn create_subscription( - &mut self, + &self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, @@ -762,7 +756,7 @@ pub trait SubscriptionService: Service { /// [`ModifySubscriptionRequest`]: ./struct.ModifySubscriptionRequest.html /// fn modify_subscription( - &mut self, + &self, subscription_id: u32, publishing_interval: f64, lifetime_count: u32, @@ -789,7 +783,7 @@ pub trait SubscriptionService: Service { /// [`SetPublishingModeRequest`]: ./struct.SetPublishingModeRequest.html /// fn set_publishing_mode( - &mut self, + &self, subscription_ids: &[u32], publishing_enabled: bool, ) -> Result, StatusCode>; @@ -816,7 +810,7 @@ pub trait SubscriptionService: Service { /// [`TransferResult`]: ./struct.TransferResult.html /// fn transfer_subscriptions( - &mut self, + &self, subscription_ids: &[u32], send_initial_values: bool, ) -> Result, StatusCode>; @@ -836,7 +830,7 @@ pub trait SubscriptionService: Service { /// /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html /// - fn delete_subscription(&mut self, subscription_id: u32) -> Result; + fn delete_subscription(&self, subscription_id: u32) -> Result; /// Deletes subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with the list /// of subscriptions to delete. @@ -855,8 +849,6 @@ pub trait SubscriptionService: Service { /// /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html /// - fn delete_subscriptions( - &mut self, - subscription_ids: &[u32], - ) -> Result, StatusCode>; + fn delete_subscriptions(&self, subscription_ids: &[u32]) + -> Result, StatusCode>; } diff --git a/client/src/session/session.rs b/client/src/session/session.rs index be932eee7..942f36860 100644 --- a/client/src/session/session.rs +++ b/client/src/session/session.rs @@ -106,10 +106,6 @@ pub enum SessionCommand { Stop, } -pub struct SessionHandle { - session: Arc>, -} - /// A session of the client. The session is associated with an endpoint and maintains a state /// when it is active. The `Session` struct provides functions for all the supported /// request types in the API. @@ -138,7 +134,7 @@ pub struct Session { /// Message queue. message_queue: Arc>, /// Session retry policy. - session_retry_policy: SessionRetryPolicy, + session_retry_policy: Arc>, /// Ignore clock skew between the client and the server. ignore_clock_skew: bool, /// Single threaded executor flag (for TCP transport) @@ -220,7 +216,7 @@ impl Session { transport, secure_channel, message_queue, - session_retry_policy, + session_retry_policy: Arc::new(Mutex::new(session_retry_policy)), ignore_clock_skew, single_threaded_executor, runtime: Arc::new(Mutex::new(runtime)), @@ -275,7 +271,7 @@ impl Session { /// * `session_retry_policy` - the session retry policy to use /// pub fn set_session_retry_policy(&mut self, session_retry_policy: SessionRetryPolicy) { - self.session_retry_policy = session_retry_policy; + self.session_retry_policy = Arc::new(Mutex::new(session_retry_policy)); } /// Register a callback to be notified when the session has been closed. @@ -473,34 +469,33 @@ impl Session { /// Connects to the server using the retry policy to repeat connecting until such time as it /// succeeds or the policy says to give up. If there is a failure, it will be /// communicated by the status code in the result. - pub fn connect(&mut self) -> Result<(), StatusCode> { + pub fn connect(&self) -> Result<(), StatusCode> { loop { match self.connect_no_retry() { Ok(_) => { info!("Connect was successful"); - self.session_retry_policy.reset_retry_count(); + let mut session_retry_policy = trace_lock_unwrap!(self.session_retry_policy); + session_retry_policy.reset_retry_count(); return Ok(()); } Err(status_code) => { - self.session_retry_policy.increment_retry_count(); + let mut session_retry_policy = trace_lock_unwrap!(self.session_retry_policy); + session_retry_policy.increment_retry_count(); session_warn!( self, "Connect was unsuccessful, error = {}, retries = {}", status_code, - self.session_retry_policy.retry_count() + session_retry_policy.retry_count() ); - match self - .session_retry_policy - .should_retry_connect(DateTime::now()) - { + match session_retry_policy.should_retry_connect(DateTime::now()) { Answer::GiveUp => { - session_error!(self, "Session has given up trying to connect to the server after {} retries", self.session_retry_policy.retry_count()); + session_error!(self, "Session has given up trying to connect to the server after {} retries", session_retry_policy.retry_count()); return Err(StatusCode::BadNotConnected); } Answer::Retry => { info!("Retrying to connect to server..."); - self.session_retry_policy.set_last_attempt(DateTime::now()); + session_retry_policy.set_last_attempt(DateTime::now()); } Answer::WaitFor(sleep_for) => { // Sleep for the instructed interval before looping around and trying @@ -522,7 +517,7 @@ impl Session { /// * `Ok(())` - connection has happened /// * `Err(StatusCode)` - reason for failure /// - pub fn connect_no_retry(&mut self) -> Result<(), StatusCode> { + pub fn connect_no_retry(&self) -> Result<(), StatusCode> { let endpoint_url = self.session_info.endpoint.endpoint_url.clone(); info!("Connect"); let security_policy = @@ -572,7 +567,7 @@ impl Session { /// Disconnect from the server. Disconnect is an explicit command to drop the socket and throw /// away all state information. If you disconnect you cannot reconnect to your existing session /// or retrieve any existing subscriptions. - pub fn disconnect(&mut self) { + pub fn disconnect(&self) { if self.is_connected() { let _ = self.delete_all_subscriptions(); let _ = self.close_secure_channel(); @@ -736,31 +731,42 @@ impl Session { let did_something = if self.is_connected() { self.handle_publish_responses() } else { - match self - .session_retry_policy - .should_retry_connect(DateTime::now()) - { + let should_retry_connect = { + let session_retry_policy = trace_lock_unwrap!(self.session_retry_policy); + session_retry_policy.should_retry_connect(DateTime::now()) + }; + match should_retry_connect { Answer::GiveUp => { + let session_retry_policy = trace_lock_unwrap!(self.session_retry_policy); session_error!( self, "Session has given up trying to reconnect to the server after {} retries", - self.session_retry_policy.retry_count() + session_retry_policy.retry_count() ); return Err(()); } Answer::Retry => { info!("Retrying to reconnect to server..."); - self.session_retry_policy.set_last_attempt(DateTime::now()); + { + let mut session_retry_policy = + trace_lock_unwrap!(self.session_retry_policy); + session_retry_policy.set_last_attempt(DateTime::now()); + } if self.reconnect_and_activate().is_ok() { info!("Retry to connect was successful"); - self.session_retry_policy.reset_retry_count(); + let mut session_retry_policy = + trace_lock_unwrap!(self.session_retry_policy); + session_retry_policy.reset_retry_count(); } else { - self.session_retry_policy.increment_retry_count(); + let mut session_retry_policy = + trace_lock_unwrap!(self.session_retry_policy); + session_retry_policy.increment_retry_count(); session_warn!( self, "Reconnect was unsuccessful, retries = {}", - self.session_retry_policy.retry_count() + session_retry_policy.retry_count() ); + drop(session_retry_policy); self.disconnect(); } true @@ -782,7 +788,7 @@ impl Session { /// connected to a server, negotiate a timeout period and then for whatever reason need to /// reconnect to that same server, you will receive the same timeout. If you get a different /// timeout then this code will not care and will continue to ping at the original rate. - fn spawn_session_activity_task(&mut self, session_timeout: f64) { + fn spawn_session_activity_task(&self, session_timeout: f64) { session_debug!(self, "spawn_session_activity_task({})", session_timeout); let connection_state = { @@ -854,7 +860,7 @@ impl Session { /// This is the internal handler for create subscription that receives the callback wrapped up and reference counted. fn create_subscription_inner( - &mut self, + &self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, @@ -920,7 +926,7 @@ impl Session { /// /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html /// - pub fn delete_all_subscriptions(&mut self) -> Result, StatusCode> { + pub fn delete_all_subscriptions(&self) -> Result, StatusCode> { let subscription_ids = { let subscription_state = trace_read_lock_unwrap!(self.subscription_state); subscription_state.subscription_ids() @@ -956,7 +962,7 @@ impl Session { } /// Notify any callback of the connection status change - fn on_connection_status_change(&mut self, connected: bool) { + fn on_connection_status_change(&self, connected: bool) { let mut session_state = trace_write_lock_unwrap!(self.session_state); session_state.on_connection_status_change(connected); } @@ -1231,13 +1237,13 @@ impl Session { impl Service for Session { /// Construct a request header for the session. All requests after create session are expected /// to supply an authentication token. - fn make_request_header(&mut self) -> RequestHeader { + fn make_request_header(&self) -> RequestHeader { let mut session_state = trace_write_lock_unwrap!(self.session_state); session_state.make_request_header() } /// Synchronously sends a request. The return value is the response to the request - fn send_request(&mut self, request: T) -> Result + fn send_request(&self, request: T) -> Result where T: Into, { @@ -1246,7 +1252,7 @@ impl Service for Session { } /// Asynchronously sends a request. The return value is the request handle of the request - fn async_send_request(&mut self, request: T, is_async: bool) -> Result + fn async_send_request(&self, request: T, is_async: bool) -> Result where T: Into, { @@ -1256,10 +1262,7 @@ impl Service for Session { } impl DiscoveryService for Session { - fn find_servers( - &mut self, - endpoint_url: T, - ) -> Result, StatusCode> + fn find_servers(&self, endpoint_url: T) -> Result, StatusCode> where T: Into, { @@ -1283,7 +1286,7 @@ impl DiscoveryService for Session { } } - fn get_endpoints(&mut self) -> Result, StatusCode> { + fn get_endpoints(&self) -> Result, StatusCode> { session_debug!(self, "get_endpoints"); let endpoint_url = self.session_info.endpoint.endpoint_url.clone(); @@ -1313,7 +1316,7 @@ impl DiscoveryService for Session { } } - fn register_server(&mut self, server: RegisteredServer) -> Result<(), StatusCode> { + fn register_server(&self, server: RegisteredServer) -> Result<(), StatusCode> { let request = RegisterServerRequest { request_header: self.make_request_header(), server, @@ -1329,13 +1332,13 @@ impl DiscoveryService for Session { } impl SecureChannelService for Session { - fn open_secure_channel(&mut self) -> Result<(), StatusCode> { + fn open_secure_channel(&self) -> Result<(), StatusCode> { session_debug!(self, "open_secure_channel"); let mut session_state = trace_write_lock_unwrap!(self.session_state); session_state.issue_or_renew_secure_channel(SecurityTokenRequestType::Issue) } - fn close_secure_channel(&mut self) -> Result<(), StatusCode> { + fn close_secure_channel(&self) -> Result<(), StatusCode> { let request = CloseSecureChannelRequest { request_header: self.make_request_header(), }; @@ -1346,7 +1349,7 @@ impl SecureChannelService for Session { } impl SessionService for Session { - fn create_session(&mut self) -> Result { + fn create_session(&self) -> Result { // Get some state stuff let endpoint_url = self.session_info.endpoint.endpoint_url.clone(); @@ -1371,7 +1374,10 @@ impl SessionService for Session { }; // Requested session timeout should be larger than your expected subscription rate. - let requested_session_timeout = self.session_retry_policy.session_timeout(); + let requested_session_timeout = { + let session_retry_policy = trace_lock_unwrap!(self.session_retry_policy); + session_retry_policy.session_timeout() + }; let request = CreateSessionRequest { request_header: self.make_request_header(), @@ -1462,7 +1468,7 @@ impl SessionService for Session { } } - fn activate_session(&mut self) -> Result<(), StatusCode> { + fn activate_session(&self) -> Result<(), StatusCode> { let (user_identity_token, user_token_signature) = { let secure_channel = trace_read_lock_unwrap!(self.secure_channel); self.user_identity_token(&secure_channel.remote_cert(), secure_channel.remote_nonce())? @@ -1552,7 +1558,7 @@ impl SessionService for Session { } } - fn cancel(&mut self, request_handle: IntegerId) -> Result { + fn cancel(&self, request_handle: IntegerId) -> Result { let request = CancelRequest { request_header: self.make_request_header(), request_handle, @@ -1569,7 +1575,7 @@ impl SessionService for Session { impl SubscriptionService for Session { fn create_subscription( - &mut self, + &self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, @@ -1593,7 +1599,7 @@ impl SubscriptionService for Session { } fn modify_subscription( - &mut self, + &self, subscription_id: u32, publishing_interval: f64, lifetime_count: u32, @@ -1639,7 +1645,7 @@ impl SubscriptionService for Session { } fn set_publishing_mode( - &mut self, + &self, subscription_ids: &[u32], publishing_enabled: bool, ) -> Result, StatusCode> { @@ -1680,7 +1686,7 @@ impl SubscriptionService for Session { } fn transfer_subscriptions( - &mut self, + &self, subscription_ids: &[u32], send_initial_values: bool, ) -> Result, StatusCode> { @@ -1709,7 +1715,7 @@ impl SubscriptionService for Session { } } - fn delete_subscription(&mut self, subscription_id: u32) -> Result { + fn delete_subscription(&self, subscription_id: u32) -> Result { if subscription_id == 0 { session_error!(self, "delete_subscription, subscription id 0 is invalid"); Err(StatusCode::BadInvalidArgument) @@ -1727,7 +1733,7 @@ impl SubscriptionService for Session { } fn delete_subscriptions( - &mut self, + &self, subscription_ids: &[u32], ) -> Result, StatusCode> { if subscription_ids.is_empty() { @@ -1761,10 +1767,7 @@ impl SubscriptionService for Session { } impl NodeManagementService for Session { - fn add_nodes( - &mut self, - nodes_to_add: &[AddNodesItem], - ) -> Result, StatusCode> { + fn add_nodes(&self, nodes_to_add: &[AddNodesItem]) -> Result, StatusCode> { if nodes_to_add.is_empty() { session_error!(self, "add_nodes, called with no nodes to add"); Err(StatusCode::BadNothingToDo) @@ -1783,7 +1786,7 @@ impl NodeManagementService for Session { } fn add_references( - &mut self, + &self, references_to_add: &[AddReferencesItem], ) -> Result, StatusCode> { if references_to_add.is_empty() { @@ -1804,7 +1807,7 @@ impl NodeManagementService for Session { } fn delete_nodes( - &mut self, + &self, nodes_to_delete: &[DeleteNodesItem], ) -> Result, StatusCode> { if nodes_to_delete.is_empty() { @@ -1825,7 +1828,7 @@ impl NodeManagementService for Session { } fn delete_references( - &mut self, + &self, references_to_delete: &[DeleteReferencesItem], ) -> Result, StatusCode> { if references_to_delete.is_empty() { @@ -1851,7 +1854,7 @@ impl NodeManagementService for Session { impl MonitoredItemService for Session { fn create_monitored_items( - &mut self, + &self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest], @@ -1942,7 +1945,7 @@ impl MonitoredItemService for Session { } fn modify_monitored_items( - &mut self, + &self, subscription_id: u32, timestamps_to_return: TimestampsToReturn, items_to_modify: &[MonitoredItemModifyRequest], @@ -2011,7 +2014,7 @@ impl MonitoredItemService for Session { } fn set_monitoring_mode( - &mut self, + &self, subscription_id: u32, monitoring_mode: MonitoringMode, monitored_item_ids: &[u32], @@ -2040,7 +2043,7 @@ impl MonitoredItemService for Session { } fn set_triggering( - &mut self, + &self, subscription_id: u32, triggering_item_id: u32, links_to_add: &[u32], @@ -2088,7 +2091,7 @@ impl MonitoredItemService for Session { } fn delete_monitored_items( - &mut self, + &self, subscription_id: u32, items_to_delete: &[u32], ) -> Result, StatusCode> { @@ -2139,7 +2142,7 @@ impl MonitoredItemService for Session { impl ViewService for Session { fn browse( - &mut self, + &self, nodes_to_browse: &[BrowseDescription], ) -> Result>, StatusCode> { if nodes_to_browse.is_empty() { @@ -2169,7 +2172,7 @@ impl ViewService for Session { } fn browse_next( - &mut self, + &self, release_continuation_points: bool, continuation_points: &[ByteString], ) -> Result>, StatusCode> { @@ -2194,7 +2197,7 @@ impl ViewService for Session { } fn translate_browse_paths_to_node_ids( - &mut self, + &self, browse_paths: &[BrowsePath], ) -> Result, StatusCode> { if browse_paths.is_empty() { @@ -2224,7 +2227,7 @@ impl ViewService for Session { } } - fn register_nodes(&mut self, nodes_to_register: &[NodeId]) -> Result, StatusCode> { + fn register_nodes(&self, nodes_to_register: &[NodeId]) -> Result, StatusCode> { if nodes_to_register.is_empty() { session_error!( self, @@ -2248,7 +2251,7 @@ impl ViewService for Session { } } - fn unregister_nodes(&mut self, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode> { + fn unregister_nodes(&self, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode> { if nodes_to_unregister.is_empty() { session_error!( self, @@ -2274,7 +2277,7 @@ impl ViewService for Session { } impl MethodService for Session { - fn call(&mut self, method: T) -> Result + fn call(&self, method: T) -> Result where T: Into, { @@ -2311,7 +2314,7 @@ impl MethodService for Session { } impl AttributeService for Session { - fn read(&mut self, nodes_to_read: &[ReadValueId]) -> Result, StatusCode> { + fn read(&self, nodes_to_read: &[ReadValueId]) -> Result, StatusCode> { if nodes_to_read.is_empty() { // No subscriptions session_error!(self, "read(), was not supplied with any nodes to read"); @@ -2342,7 +2345,7 @@ impl AttributeService for Session { } fn history_read( - &mut self, + &self, history_read_details: HistoryReadAction, timestamps_to_return: TimestampsToReturn, release_continuation_points: bool, @@ -2399,7 +2402,7 @@ impl AttributeService for Session { } } - fn write(&mut self, nodes_to_write: &[WriteValue]) -> Result, StatusCode> { + fn write(&self, nodes_to_write: &[WriteValue]) -> Result, StatusCode> { if nodes_to_write.is_empty() { // No subscriptions session_error!(self, "write() was not supplied with any nodes to write"); @@ -2422,7 +2425,7 @@ impl AttributeService for Session { } fn history_update( - &mut self, + &self, history_update_details: &[HistoryUpdateAction], ) -> Result, StatusCode> { if history_update_details.is_empty() { diff --git a/integration/src/harness.rs b/integration/src/harness.rs index 8372df8e6..9fcff0139 100644 --- a/integration/src/harness.rs +++ b/integration/src/harness.rs @@ -485,7 +485,7 @@ pub fn regular_client_test( let session = client .connect_to_endpoint(client_endpoint, identity_token) .unwrap(); - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); // Read the variable let mut values = { @@ -517,7 +517,7 @@ pub fn inactive_session_client_test( let session = client .connect_to_endpoint(client_endpoint, identity_token) .unwrap(); - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); // Read the variable and expect that to fail let read_nodes = vec![ReadValueId::from(v1_node_id())]; diff --git a/samples/event-client/src/main.rs b/samples/event-client/src/main.rs index 2793c9a8d..19ad4570a 100644 --- a/samples/event-client/src/main.rs +++ b/samples/event-client/src/main.rs @@ -103,7 +103,7 @@ fn subscribe_to_events( event_source: &str, event_fields: &str, ) -> Result<(), StatusCode> { - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); let event_fields: Vec = event_fields.split(',').map(|s| s.into()).collect(); diff --git a/samples/mqtt-client/src/main.rs b/samples/mqtt-client/src/main.rs index b595dde21..99eab07a8 100644 --- a/samples/mqtt-client/src/main.rs +++ b/samples/mqtt-client/src/main.rs @@ -133,7 +133,7 @@ fn subscription_loop( // This scope is important - we don't want to session to be locked when the code hits the // loop below { - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); // Creates our subscription - one update every second. The update is sent as a message // to the MQTT thread to be published. diff --git a/samples/simple-client/src/main.rs b/samples/simple-client/src/main.rs index b7fd9239e..44e42256f 100644 --- a/samples/simple-client/src/main.rs +++ b/samples/simple-client/src/main.rs @@ -84,7 +84,7 @@ fn main() -> Result<(), ()> { } fn subscribe_to_variables(session: Arc>, ns: u16) -> Result<(), StatusCode> { - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); // Creates a subscription with a data change callback let subscription_id = session.create_subscription( 2000.0, diff --git a/samples/web-client/src/main.rs b/samples/web-client/src/main.rs index 8540294c6..6bd61a0d8 100644 --- a/samples/web-client/src/main.rs +++ b/samples/web-client/src/main.rs @@ -238,12 +238,12 @@ impl OPCUASession { fn disconnect(&mut self, _ctx: &mut ::Context) { if let Some(ref mut session) = self.session { - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); if session.is_connected() { session.disconnect(); } } - if let Some(mut tx) = self.session_tx.take() { + if let Some(tx) = self.session_tx.take() { let _ = tx.send(SessionCommand::Stop); } self.session = None; @@ -290,7 +290,7 @@ impl OPCUASession { let select_criteria = args.get(2).unwrap(); if let Some(ref mut session) = self.session { - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); let event_node_id = NodeId::from_str(event_node_id); if event_node_id.is_err() { @@ -400,7 +400,7 @@ impl OPCUASession { // Create a subscription println!("Creating subscription"); - let mut session = session.write().unwrap(); + let session = session.read().unwrap(); // Creates our subscription let addr_for_datachange = ctx.address();