Skip to content

Commit

Permalink
Subscription refactor work
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Apr 4, 2019
1 parent dd5c09b commit 473a79c
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 71 deletions.
175 changes: 166 additions & 9 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
result::Result,
collections::HashSet,
str::FromStr,
sync::{Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock, mpsc},
time::{Instant, Duration},
};
use futures::{
Expand Down Expand Up @@ -410,22 +410,43 @@ impl Session {
self.transport.is_connected()
}

const POLL_SLEEP_INTERVAL: u64 = 50;

/// Runs a polling loop for this session to perform periodic activity such as processing subscriptions,
/// as well as recovering from connection errors. The run command will break if the session is disconnected
/// and cannot be reestablished.
///
/// # Returns
///
/// * `Ok(bool)` - returns `true` if an action was performed, `false` if no action was performed
/// but polling slept for a little bit.
/// * `Err(StatusCode)` - Request failed, status code is the reason for failure
/// * `mpsc::Sender<()>` - A sender that allows the caller to send a single unity message to the
/// run loop to cause it to abort.
///
pub fn run(session: Arc<RwLock<Session>>) {
const POLL_SLEEP_INTERVAL: u64 = 50;
pub fn run(session: Arc<RwLock<Session>>) -> mpsc::Sender<()> {
let (tx, rx) = mpsc::channel();
Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx);
tx
}

/// Runs the server asynchronously by spawning a thread for it to run on, allowing the calling
/// thread to proceed to do other things.
pub fn run_async(session: Arc<RwLock<Session>>) -> mpsc::Sender<()> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx)
});
tx
}

/// Main running loop for a session
pub fn run_loop(session: Arc<RwLock<Session>>, sleep_interval: u64, rx: mpsc::Receiver<()>) {
loop {
// Main thread has nothing to do - just wait for publish events to roll in
let mut session = session.write().unwrap();
if session.poll(POLL_SLEEP_INTERVAL).is_err() {
if rx.try_recv().is_ok() {
info!("Run session was terminated by a message");
break;
}
if session.poll(sleep_interval).is_err() {
// Break the loop if connection goes down
info!("Connection to server broke, so terminating");
break;
Expand Down Expand Up @@ -944,7 +965,17 @@ impl Session {
}
}

/// Sends a ReadRequest to the server
/// Reads the value of nodes by sending a [`ReadRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_read` - A list of `ReadValueId` to be read by the server.
///
/// # Returns
///
/// * `Ok(Vec<DataValue>)` - A list of datavalues corresponding to each read operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn read(&mut self, nodes_to_read: &[ReadValueId]) -> Result<Option<Vec<DataValue>>, StatusCode> {
if nodes_to_read.is_empty() {
// No subscriptions
Expand All @@ -970,7 +1001,17 @@ impl Session {
}
}

/// Sends a WriteRequest to the server
/// Writes values to nodes by sending a [`WriteRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_write` - A list of `WriteValue` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - A list of results corresponding to each write operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn write_value(&mut self, nodes_to_write: &[WriteValue]) -> Result<Option<Vec<StatusCode>>, StatusCode> {
if nodes_to_write.is_empty() {
// No subscriptions
Expand All @@ -993,6 +1034,122 @@ impl Session {
}
}

/// Add nodes by sending a [`AddNodesRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_add` - A list of `AddNodesItem` to be added to the server.
///
/// # Returns
///
/// * `Ok(Vec<AddNodesResult>)` - Result for each add node operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn add_nodes(&mut self, nodes_to_add: &[AddNodesItem]) -> Result<Vec<AddNodesResult>, StatusCode> {
if nodes_to_add.is_empty() {
error!("add_nodes, called with no nodes to add");
Err(StatusCode::BadNothingToDo)
} else {
let request = AddNodesRequest {
request_header: self.make_request_header(),
nodes_to_add: Some(nodes_to_add.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::AddNodesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Add references by sending a [`AddReferencesRequest`] to the server.
///
/// # Arguments
///
/// * `references_to_add` - A list of `AddReferencesItem` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - Result for each add reference operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn add_references(&mut self, references_to_add: &[AddReferencesItem]) -> Result<Vec<StatusCode>, StatusCode> {
if references_to_add.is_empty() {
error!("add_references, called with no references to add");
Err(StatusCode::BadNothingToDo)
} else {
let request = AddReferencesRequest {
request_header: self.make_request_header(),
references_to_add: Some(references_to_add.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::AddReferencesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Delete nodes by sending a [`DeleteNodesRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_delete` - A list of `DeleteNodesItem` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - Result for each delete node operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn delete_nodes(&mut self, nodes_to_delete: &[DeleteNodesItem]) -> Result<Vec<StatusCode>, StatusCode> {
if nodes_to_delete.is_empty() {
error!("delete_nodes, called with no nodes to delete");
Err(StatusCode::BadNothingToDo)
} else {
let request = DeleteNodesRequest {
request_header: self.make_request_header(),
nodes_to_delete: Some(nodes_to_delete.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::DeleteNodesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Delete references by sending a [`DeleteReferencesRequest`] to the server.
///
/// # Arguments
///
/// * `nodes_to_delete` - A list of `DeleteReferencesItem` to be sent to the server.
///
/// # Returns
///
/// * `Ok(Vec<StatusCode>)` - Result for each delete node operation.
/// * `Err(StatusCode)` - Status code reason for failure.
///
pub fn delete_references(&mut self, references_to_delete: &[DeleteReferencesItem]) -> Result<Vec<StatusCode>, StatusCode> {
if references_to_delete.is_empty() {
error!("delete_references, called with no references to delete");
Err(StatusCode::BadNothingToDo)
} else {
let request = DeleteReferencesRequest {
request_header: self.make_request_header(),
references_to_delete: Some(references_to_delete.to_vec()),
};
let response = self.send_request(request)?;
if let SupportedMessage::DeleteReferencesResponse(response) = response {
Ok(response.results.unwrap())
} else {
Err(crate::process_unexpected_response(response))
}
}
}

/// Create a subscription by sending a [`CreateSubscriptionRequest`] to the server.
///
/// # Arguments
Expand Down
11 changes: 9 additions & 2 deletions server/src/subscriptions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use opcua_types::SupportedMessage;
use opcua_types::service_types::PublishRequest;
use opcua_types::{
SupportedMessage,
status_code::StatusCode,
service_types::PublishRequest,
};

/// The publish request entry preserves the request_id which is part of the chunk layer but clients
/// are fickle about receiving responses from the same as the request. Normally this is easy because
/// request and response are synchronous, but publish requests are async, so we preserve the request_id
/// so that later we can send out responses that have the proper req id
#[derive(Clone)]
pub struct PublishRequestEntry {
// The request id
pub request_id: u32,
// The request itself
pub request: PublishRequest,
// The result of clearing acknowledgments when the request was received.
pub results: Option<Vec<StatusCode>>,
}

#[derive(Clone, Debug, PartialEq)]
Expand Down
Loading

0 comments on commit 473a79c

Please sign in to comment.