Skip to content

Commit

Permalink
Clean up server side callbacks including Method service callback. Ren…
Browse files Browse the repository at this point in the history
…ame call_method() to call() client side. Change some tests to use a slightly cleaner test setup pattern.
  • Loading branch information
locka99 committed Apr 12, 2019
1 parent 77debb0 commit 2838691
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 226 deletions.
4 changes: 2 additions & 2 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ impl Session {
/// [`CallMethodRequest`]: ./struct.CallMethodRequest.html
/// [`CallMethodResult`]: ./struct.CallMethodResult.html
///
pub fn call_method<T>(&mut self, method: T) -> Result<CallMethodResult, StatusCode> where T: Into<CallMethodRequest> {
pub fn call<T>(&mut self, method: T) -> Result<CallMethodResult, StatusCode> where T: Into<CallMethodRequest> {
debug!("call_method");
let methods_to_call = Some(vec![method.into()]);
let request = CallRequest {
Expand Down Expand Up @@ -1485,7 +1485,7 @@ impl Session {
let object_id: NodeId = ObjectId::Server.into();
let method_id: NodeId = MethodId::Server_GetMonitoredItems.into();
let request: CallMethodRequest = (object_id, method_id, args).into();
let response = self.call_method(request)?;
let response = self.call(request)?;
if let Some(mut result) = response.output_arguments {
if result.len() == 2 {
let server_handles = result.remove(0).as_u32_array().map_err(|_| StatusCode::BadUnexpectedError)?;
Expand Down
2 changes: 1 addition & 1 deletion client/src/session_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum Answer {
/// Give up reconnecting
GiveUp,
}

/// The session retry policy determines what to if the connection fails. In these circumstances,
/// the client needs to re-establish a connection and the policy says how many times to try between
/// failure and at what interval.
Expand Down
13 changes: 7 additions & 6 deletions server/src/address_space/address_space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
diagnostics::ServerDiagnostics,
state::ServerState,
session::Session,
callbacks,
constants,
};

Expand Down Expand Up @@ -83,7 +84,7 @@ macro_rules! server_diagnostics_summary {
}
}

type MethodCallback = Box<dyn Fn(&AddressSpace, &ServerState, &mut Session, &CallMethodRequest) -> Result<CallMethodResult, StatusCode> + Send + Sync + 'static>;
type MethodCallback = Box<callbacks::Method + Send + Sync>;

#[derive(PartialEq, Eq, Clone, Debug, Hash)]
struct MethodKey {
Expand Down Expand Up @@ -329,8 +330,8 @@ impl AddressSpace {

// Server method handlers
use crate::address_space::method_impls;
self.register_method_handler(ObjectId::Server, MethodId::Server_GetMonitoredItems, Box::new(method_impls::handle_get_monitored_items));
self.register_method_handler(ObjectId::Server, MethodId::Server_ResendData, Box::new(method_impls::handle_resend_data));
self.register_method_handler(ObjectId::Server, MethodId::Server_ResendData, Box::new(method_impls::ServerResendDataMethod));
self.register_method_handler(ObjectId::Server, MethodId::Server_GetMonitoredItems, Box::new(method_impls::ServerGetMonitoredItemsMethod));
}
}

Expand Down Expand Up @@ -668,7 +669,7 @@ impl AddressSpace {
///
/// Calls require a registered handler to handle the method. If there is no handler, or if
/// the request refers to a non existent object / method, the function will return an error.
pub fn call_method(&self, server_state: &ServerState, session: &mut Session, request: &CallMethodRequest) -> Result<CallMethodResult, StatusCode> {
pub fn call_method(&mut self, _server_state: &ServerState, session: &mut Session, request: &CallMethodRequest) -> Result<CallMethodResult, StatusCode> {
let (object_id, method_id) = (&request.object_id, &request.method_id);

// Handle the call
Expand All @@ -689,10 +690,10 @@ impl AddressSpace {
object_id: object_id.clone(),
method_id: method_id.clone(),
};
if let Some(handler) = self.method_handlers.get(&key) {
if let Some(handler) = self.method_handlers.get_mut(&key) {
// Call the handler
trace!("Method call to {:?} on {:?} being handled by a registered handler", method_id, object_id);
handler(self, server_state, session, request)
handler.call(session, request)
} else {
// TODO we could do a secondary search on a (NodeId::null(), method_id) here
// so that method handler is reusable for multiple objects
Expand Down
121 changes: 64 additions & 57 deletions server/src/address_space/method_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use opcua_types::status_code::StatusCode;
use opcua_types::service_types::{CallMethodRequest, CallMethodResult};

use crate::{
address_space::AddressSpace,
state::ServerState,
session::Session
session::Session,
callbacks::Method,
};

/// Count the number of provided input arguments, comparing them to the expected number.
Expand Down Expand Up @@ -47,70 +46,78 @@ macro_rules! get_input_argument {
}

/// This is the handler for Server.ResendData method call.
pub fn handle_resend_data(_: &AddressSpace, _: &ServerState, session: &mut Session, request: &CallMethodRequest) -> Result<CallMethodResult, StatusCode> {
debug!("Method handler for ResendData");
pub struct ServerResendDataMethod;

// OPC UA part 5 - ResendData([in] UInt32 subscriptionId);
//
// subscriptionId - Identifier of the subscription to refresh
//
// Return codes
//
// BadSubscriptionIdInvalid
// BadUserAccessDenied
impl Method for ServerResendDataMethod {
fn call(&mut self, session: &mut Session, request: &CallMethodRequest) -> Result<CallMethodResult, StatusCode> {
debug!("Method handler for ResendData");

ensure_input_argument_count(request, 1)?;
// OPC UA part 5 - ResendData([in] UInt32 subscriptionId);
//
// subscriptionId - Identifier of the subscription to refresh
//
// Return codes
//
// BadSubscriptionIdInvalid
// BadUserAccessDenied

let subscription_id = get_input_argument!(request, 0, UInt32)?;
ensure_input_argument_count(request, 1)?;

if let Some(subscription) = session.subscriptions.get_mut(*subscription_id) {
subscription.set_resend_data();
Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: None,
})
} else {
// Subscription id does not exist
// Note we could check other sessions for a matching id and return BadUserAccessDenied in that case
Err(StatusCode::BadSubscriptionIdInvalid)
let subscription_id = get_input_argument!(request, 0, UInt32)?;

if let Some(subscription) = session.subscriptions.get_mut(*subscription_id) {
subscription.set_resend_data();
Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: None,
})
} else {
// Subscription id does not exist
// Note we could check other sessions for a matching id and return BadUserAccessDenied in that case
Err(StatusCode::BadSubscriptionIdInvalid)
}
}
}

/// This is the handler for the Server.GetMonitoredItems method call.
pub fn handle_get_monitored_items(_: &AddressSpace, _: &ServerState, session: &mut Session, request: &CallMethodRequest) -> Result<CallMethodResult, StatusCode> {
debug!("Method handler for GetMonitoredItems");
pub struct ServerGetMonitoredItemsMethod;

// OPC UA part 5 - GetMonitoredItems([in] UInt32 subscriptionId, [out] UInt32[] serverHandles, [out] UInt32[] clientHandles);
//
// subscriptionId - Identifier of the subscription
// serverHandles - Array of serverHandles for all MonitoredItems of the Subscription identified by subscriptionId
// clientHandles - Array of clientHandles for all MonitoredItems of the Subscription identified by subscriptionId
//
// Return codes
//
// BadSubscriptionIdInvalid
// BadUserAccessDenied
impl Method for ServerGetMonitoredItemsMethod {
fn call(&mut self, session: &mut Session, request: &CallMethodRequest) -> Result<CallMethodResult, StatusCode> {
debug!("Method handler for GetMonitoredItems");

ensure_input_argument_count(request, 1)?;
// OPC UA part 5 - GetMonitoredItems([in] UInt32 subscriptionId, [out] UInt32[] serverHandles, [out] UInt32[] clientHandles);
//
// subscriptionId - Identifier of the subscription
// serverHandles - Array of serverHandles for all MonitoredItems of the Subscription identified by subscriptionId
// clientHandles - Array of clientHandles for all MonitoredItems of the Subscription identified by subscriptionId
//
// Return codes
//
// BadSubscriptionIdInvalid
// BadUserAccessDenied

let subscription_id = get_input_argument!(request, 0, UInt32)?;
ensure_input_argument_count(request, 1)?;

if let Some(subscription) = session.subscriptions.subscriptions().get(&subscription_id) {
// Response
// serverHandles: Vec<u32>
// clientHandles: Vec<u32>
let (server_handles, client_handles) = subscription.get_handles();
Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: Some(vec![server_handles.into(), client_handles.into()]),
})
} else {
// Subscription id does not exist
// Note we could check other sessions for a matching id and return BadUserAccessDenied in that case
Err(StatusCode::BadSubscriptionIdInvalid)
let subscription_id = get_input_argument!(request, 0, UInt32)?;

if let Some(subscription) = session.subscriptions.subscriptions().get(&subscription_id) {
// Response
// serverHandles: Vec<u32>
// clientHandles: Vec<u32>
let (server_handles, client_handles) = subscription.get_handles();
Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: Some(vec![server_handles.into(), client_handles.into()]),
})
} else {
// Subscription id does not exist
// Note we could check other sessions for a matching id and return BadUserAccessDenied in that case
Err(StatusCode::BadSubscriptionIdInvalid)
}
}
}
}
17 changes: 13 additions & 4 deletions server/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::sync::{Arc, RwLock};
use opcua_types::{
NodeId,
status_code::StatusCode,
service_types::{CallMethodRequest, CallMethodResult},
};

use crate::session::Session;

/// Called by RegisterNodes service
pub trait OnRegisterNodes {
pub trait RegisterNodes {
/// Called when a client calls the RegisterNodes service. This implementation should return a list
/// of the same size and order containing node ids corresponding to the input, or aliases. The implementation
/// should return `BadNodeIdInvalid` if any of the node ids in the input are invalid.
Expand All @@ -20,16 +21,24 @@ pub trait OnRegisterNodes {
///
/// There is no guarantee that the corresponding `OnUnregisterNodes` will be called by the client,
/// therefore use the weak session references and a periodic check to perform any housekeeping.
fn on_register_nodes(&mut self, session: Arc<RwLock<Session>>, nodes_to_register: &[NodeId]) -> Result<Vec<NodeId>, StatusCode>;
fn register_nodes(&mut self, session: Arc<RwLock<Session>>, nodes_to_register: &[NodeId]) -> Result<Vec<NodeId>, StatusCode>;
}

/// Called by UnregisterNodes service
pub trait OnUnregisterNodes {
pub trait UnregisterNodes {
/// Called when a client calls the UnregisterNodes service. See `OnRegisterNodes` trait for more
/// information. A client may not call this function, e.g. if connection breaks so do not
/// count on receiving this to perform any housekeeping.
///
/// The function should not validate the nodes in the request and should just ignore any
/// unregistered nodes.
fn on_unregister_nodes(&mut self, session: Arc<RwLock<Session>>, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode>;
fn unregister_nodes(&mut self, session: Arc<RwLock<Session>>, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode>;
}

/// Called by the Method service when it invokes a method
pub trait Method {
/// A method is registered via the address space to a method id and optionally an object id.
/// When a client sends a CallRequest / CallMethod request, the registered object will
/// be invoked to handle the call.
fn call(&mut self, session: &mut Session, request: &CallMethodRequest) -> Result<CallMethodResult, StatusCode>;
}
2 changes: 1 addition & 1 deletion server/src/services/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl MessageHandler {

SupportedMessage::CallRequest(ref request) => {
validated_request!(self, request, &mut session, {
self.method_service.call(&address_space, &server_state, &mut session, request)
self.method_service.call(&mut address_space, &server_state, &mut session, request)
})
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/services/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl MethodService {
MethodService {}
}

pub fn call(&self, address_space: &AddressSpace, server_state: &ServerState, session: &mut Session, request: &CallRequest) -> Result<SupportedMessage, StatusCode> {
pub fn call(&self, address_space: &mut AddressSpace, server_state: &ServerState, session: &mut Session, request: &CallRequest) -> Result<SupportedMessage, StatusCode> {
if let Some(ref calls) = request.methods_to_call {
if calls.len() >= server_state.max_method_calls() {
Ok(self.service_fault(&request.request_header, StatusCode::BadTooManyOperations))
Expand Down
42 changes: 23 additions & 19 deletions server/src/services/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,17 +153,19 @@ impl ViewService {
if is_empty_option_vec!(request.nodes_to_register) {
Ok(self.service_fault(&request.request_header, StatusCode::BadNothingToDo))
} else {
let nodes_to_register = request.nodes_to_register.as_ref().unwrap();
if let Some(ref mut callback) = server_state.register_nodes_callback {
let result = callback.on_register_nodes(session, &nodes_to_register[..]);
if let Ok(registered_node_ids) = result {
let response = RegisterNodesResponse {
response_header: ResponseHeader::new_good(&request.request_header),
registered_node_ids: Some(registered_node_ids),
};
Ok(response.into())
} else {
Ok(self.service_fault(&request.request_header, result.unwrap_err()))
let nodes_to_register = request.nodes_to_register.as_ref().unwrap();
match callback.register_nodes(session, &nodes_to_register[..]) {
Ok(registered_node_ids) => {
let response = RegisterNodesResponse {
response_header: ResponseHeader::new_good(&request.request_header),
registered_node_ids: Some(registered_node_ids),
};
Ok(response.into())
}
Err(err) => {
Ok(self.service_fault(&request.request_header, err))
}
}
} else {
Ok(self.service_fault(&request.request_header, StatusCode::BadNodeIdInvalid))
Expand All @@ -175,16 +177,18 @@ impl ViewService {
if is_empty_option_vec!(request.nodes_to_unregister) {
Ok(self.service_fault(&request.request_header, StatusCode::BadNothingToDo))
} else {
let nodes_to_unregister = request.nodes_to_unregister.as_ref().unwrap();
if let Some(ref mut callback) = server_state.unregister_nodes_callback {
let result = callback.on_unregister_nodes(session, &nodes_to_unregister[..]);
if let Ok(_) = result {
let response = UnregisterNodesResponse {
response_header: ResponseHeader::new_good(&request.request_header),
};
Ok(response.into())
} else {
Ok(self.service_fault(&request.request_header, result.unwrap_err()))
let nodes_to_unregister = request.nodes_to_unregister.as_ref().unwrap();
match callback.unregister_nodes(session, &nodes_to_unregister[..]) {
Ok(_) => {
let response = UnregisterNodesResponse {
response_header: ResponseHeader::new_good(&request.request_header),
};
Ok(response.into())
}
Err(err) => {
Ok(self.service_fault(&request.request_header, err))
}
}
} else {
Ok(UnregisterNodesResponse {
Expand Down
8 changes: 4 additions & 4 deletions server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use opcua_types::{

use crate::config::{ServerConfig, ServerEndpoint};
use crate::diagnostics::ServerDiagnostics;
use crate::callbacks::*;
use crate::callbacks::{RegisterNodes, UnregisterNodes};

const TOKEN_POLICY_ANONYMOUS: &str = "anonymous";
const TOKEN_POLICY_USER_PASS_PLAINTEXT: &str = "userpass_plaintext";
Expand Down Expand Up @@ -71,9 +71,9 @@ pub struct ServerState {
/// Diagnostic information
pub diagnostics: Arc<RwLock<ServerDiagnostics>>,
/// Callback for register nodes
pub(crate) register_nodes_callback: Option<Box<OnRegisterNodes + Send + Sync>>,
pub(crate) register_nodes_callback: Option<Box<RegisterNodes + Send + Sync>>,
/// Callback for unregister nodes
pub(crate) unregister_nodes_callback: Option<Box<OnUnregisterNodes + Send + Sync>>,
pub(crate) unregister_nodes_callback: Option<Box<UnregisterNodes + Send + Sync>>,

}

Expand Down Expand Up @@ -317,7 +317,7 @@ impl ServerState {
}
}

pub fn set_register_nodes_callbacks(&mut self, register_nodes_callback: Box<OnRegisterNodes + Send + Sync>, unregister_nodes_callback: Box<OnUnregisterNodes + Send + Sync>) {
pub fn set_register_nodes_callbacks(&mut self, register_nodes_callback: Box<RegisterNodes + Send + Sync>, unregister_nodes_callback: Box<UnregisterNodes + Send + Sync>) {
self.register_nodes_callback = Some(register_nodes_callback);
self.unregister_nodes_callback = Some(unregister_nodes_callback);
}
Expand Down
Loading

0 comments on commit 2838691

Please sign in to comment.