Skip to content

Commit

Permalink
Merge branch 'subscription_refactor2'
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Apr 8, 2019
2 parents dd4f89f + 72de061 commit 9d777ee
Show file tree
Hide file tree
Showing 28 changed files with 630 additions and 384 deletions.
2 changes: 1 addition & 1 deletion client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Client {
};

info!("Server has these endpoints:");
endpoints.iter().for_each(|e| println!(" {} - {:?} / {:?}", e.endpoint_url,
endpoints.iter().for_each(|e| info!(" {} - {:?} / {:?}", e.endpoint_url,
SecurityPolicy::from_str(e.security_policy_uri.as_ref()).unwrap(),
e.security_mode));

Expand Down
191 changes: 181 additions & 10 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
result::Result,
collections::HashSet,
str::FromStr,
sync::{Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock, mpsc},
time::{Instant, Duration},
};
use futures::{
Expand Down Expand Up @@ -410,22 +410,56 @@ impl Session {
self.transport.is_connected()
}

const POLL_SLEEP_INTERVAL: u64 = 50;

/// Runs a polling loop for this session to perform periodic activity such as processing subscriptions,
/// as well as recovering from connection errors. The run command will break if the session is disconnected
/// and cannot be reestablished.
///
/// The `run()` function returns a `Sender` that can be used to send a `()` message to the session
/// to cause it to terminate.
///
/// # Returns
///
/// * `Ok(bool)` - returns `true` if an action was performed, `false` if no action was performed
/// but polling slept for a little bit.
/// * `Err(StatusCode)` - Request failed, status code is the reason for failure
/// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the
/// run loop to cause it to abort.
///
pub fn run(session: Arc<RwLock<Session>>) -> mpsc::Sender<()> {
let (tx, rx) = mpsc::channel();
Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx);
tx
}

/// Runs the server asynchronously on a new thread, allowing the calling
/// thread to continue do other things.
///
/// The `run()` function returns a `Sender` that can be used to send a `()` message to the session
/// to cause it to terminate.
///
pub fn run(session: Arc<RwLock<Session>>) {
const POLL_SLEEP_INTERVAL: u64 = 50;
/// # Returns
///
/// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the
/// run loop to cause it to abort.
///
pub fn run_async(session: Arc<RwLock<Session>>) -> mpsc::Sender<()> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx)
});
tx
}

/// The main running loop for a session. This is used by `run()` and `run_async()` to run
/// continuously until a signal is received to terminate.
fn run_loop(session: Arc<RwLock<Session>>, sleep_interval: u64, rx: mpsc::Receiver<()>) {
loop {
// Main thread has nothing to do - just wait for publish events to roll in
let mut session = session.write().unwrap();
if session.poll(POLL_SLEEP_INTERVAL).is_err() {
if rx.try_recv().is_ok() {
info!("Run session was terminated by a message");
break;
}
if session.poll(sleep_interval).is_err() {
// Break the loop if connection goes down
info!("Connection to server broke, so terminating");
break;
Expand Down Expand Up @@ -944,7 +978,17 @@ impl Session {
}
}

/// Sends a ReadRequest to the server
/// Reads the value of nodes by sending a [`ReadRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_read` - A list of `ReadValueId` to be read by the server.
///
/// # Returns
///
/// * `Ok(Vec<DataValue>)` - A list of datavalues corresponding to each read operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn read(&mut self, nodes_to_read: &[ReadValueId]) -> Result<Option<Vec<DataValue>>, StatusCode> {
if nodes_to_read.is_empty() {
// No subscriptions
Expand All @@ -970,7 +1014,17 @@ impl Session {
}
}

/// Sends a WriteRequest to the server
/// Writes values to nodes by sending a [`WriteRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_write` - A list of `WriteValue` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - A list of results corresponding to each write operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn write_value(&mut self, nodes_to_write: &[WriteValue]) -> Result<Option<Vec<StatusCode>>, StatusCode> {
if nodes_to_write.is_empty() {
// No subscriptions
Expand All @@ -993,6 +1047,122 @@ impl Session {
}
}

/// Add nodes by sending a [`AddNodesRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_add` - A list of `AddNodesItem` to be added to the server.
///
/// # Returns
///
/// * `Ok(Vec<AddNodesResult>)` - Result for each add node operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn add_nodes(&mut self, nodes_to_add: &[AddNodesItem]) -> Result<Vec<AddNodesResult>, StatusCode> {
if nodes_to_add.is_empty() {
error!("add_nodes, called with no nodes to add");
Err(StatusCode::BadNothingToDo)
} else {
let request = AddNodesRequest {
request_header: self.make_request_header(),
nodes_to_add: Some(nodes_to_add.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::AddNodesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Add references by sending a [`AddReferencesRequest`] to the server.
///
/// # Arguments
///
/// * `references_to_add` - A list of `AddReferencesItem` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - Result for each add reference operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn add_references(&mut self, references_to_add: &[AddReferencesItem]) -> Result<Vec<StatusCode>, StatusCode> {
if references_to_add.is_empty() {
error!("add_references, called with no references to add");
Err(StatusCode::BadNothingToDo)
} else {
let request = AddReferencesRequest {
request_header: self.make_request_header(),
references_to_add: Some(references_to_add.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::AddReferencesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Delete nodes by sending a [`DeleteNodesRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_delete` - A list of `DeleteNodesItem` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - Result for each delete node operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn delete_nodes(&mut self, nodes_to_delete: &[DeleteNodesItem]) -> Result<Vec<StatusCode>, StatusCode> {
if nodes_to_delete.is_empty() {
error!("delete_nodes, called with no nodes to delete");
Err(StatusCode::BadNothingToDo)
} else {
let request = DeleteNodesRequest {
request_header: self.make_request_header(),
nodes_to_delete: Some(nodes_to_delete.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::DeleteNodesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Delete references by sending a [`DeleteReferencesRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_delete` - A list of `DeleteReferencesItem` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - Result for each delete node operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn delete_references(&mut self, references_to_delete: &[DeleteReferencesItem]) -> Result<Vec<StatusCode>, StatusCode> {
if references_to_delete.is_empty() {
error!("delete_references, called with no references to delete");
Err(StatusCode::BadNothingToDo)
} else {
let request = DeleteReferencesRequest {
request_header: self.make_request_header(),
references_to_delete: Some(references_to_delete.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::DeleteReferencesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Create a subscription by sending a [`CreateSubscriptionRequest`] to the server.
///
/// # Arguments
Expand Down Expand Up @@ -1041,7 +1211,8 @@ impl Session {
}

/// This is the internal handler for create subscription that receives the callback wrapped up and reference counted.
fn create_subscription_inner(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32, priority: u8, publishing_enabled: bool,
fn create_subscription_inner(&mut self, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32,
priority: u8, publishing_enabled: bool,
callback: Arc<Mutex<dyn OnDataChange + Send + Sync + 'static>>)
-> Result<u32, StatusCode>
{
Expand Down
2 changes: 1 addition & 1 deletion samples/gfx-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn subscription_loop(nodes_to_monitor: Vec<ReadValueId>, session: Arc<RwLock<Ses
}

// Loops forever. The publish thread will call the callback with changes on the variables
Session::run(session);
let _ = Session::run(session);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion samples/mqtt-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ fn subscription_loop(session: Arc<RwLock<Session>>, tx: mpsc::Sender<(NodeId, Da
}

// Loops forever. The publish thread will call the callback with changes on the variables
Session::run(session);
let _ = Session::run(session);

Ok(())
}
3 changes: 2 additions & 1 deletion samples/server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ user_tokens:
unused_user:
user: unused
pass: unused1
discovery_url: ""
discovery_urls:
- "opc.tcp://127.0.0.1:4855/"
endpoints:
basic128rsa15_sign:
path: /
Expand Down
2 changes: 1 addition & 1 deletion samples/simple-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn subscription_loop(session: Arc<RwLock<Session>>) -> Result<(), StatusCode> {
}

// Loops forever. The publish thread will call the callback with changes on the variables
Session::run(session);
let _ = Session::run(session);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion samples/web-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ fn subscription_loop(session: Arc<RwLock<Session>>) -> Result<(), StatusCode> {
}

// Loops forever. The publish thread will call the callback with changes on the variables
Session::run(session);
let _ = Session::run(session);

Ok(())
}
1 change: 0 additions & 1 deletion server/src/address_space/address_space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
state::ServerState,
session::Session,
constants,
DateTimeUtc,
};

/// Searches for the specified node by type, expecting it to exist
Expand Down
23 changes: 19 additions & 4 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ impl ServerBuilder {
ServerBuilder::new()
.application_name(application_name)
.endpoint("none", ServerEndpoint::new_none(DEFAULT_ENDPOINT_PATH, &user_token_ids))
.discovery_urls(vec![
DEFAULT_ENDPOINT_PATH.into()
])
}

/// Sample mode turns on everything including a hard coded user/pass
Expand Down Expand Up @@ -67,6 +70,9 @@ impl ServerBuilder {
("basic256sha256_sign_encrypt", ServerEndpoint::new_basic256sha256_sign_encrypt(path, &user_token_ids)),
("no_access", ServerEndpoint::new_none("/noaccess", &[]))
])
.discovery_urls(vec![
DEFAULT_ENDPOINT_PATH.into()
])
}

/// Yields a [`Client`] from the values set by the builder. If the builder is not in a valid state
Expand All @@ -75,7 +81,7 @@ impl ServerBuilder {
/// [`Server`]: ../server/struct.Server.html
pub fn server(self) -> Option<Server> {
if self.is_valid() {
Some(Server::new(self.config))
Some(Server::new(self.config()))
} else {
None
}
Expand Down Expand Up @@ -157,9 +163,18 @@ impl ServerBuilder {
self
}

/// Discovery endpoint url - the url of this server used by clients to get endpoints.
pub fn discovery_url<T>(mut self, discovery_url: T) -> Self where T: Into<String> {
self.config.discovery_url = discovery_url.into();
/// Discovery endpoint urls - the urls of this server used by clients to get endpoints.
/// If the url is relative, e.g. "/" then the code will make a url for you using the port/host
/// settings as they are at the time this function is executed.
pub fn discovery_urls(mut self, discovery_urls: Vec<String>) -> Self {
self.config.discovery_urls = discovery_urls.iter().map(|discovery_url| {
if discovery_url.starts_with("/") {
// Turn into an opc url
format!("opc.tcp://{}:{}/", self.config.tcp_config.host, self.config.tcp_config.port)
} else {
discovery_url.clone()
}
}).collect();
self
}

Expand Down
Loading

0 comments on commit 9d777ee

Please sign in to comment.