Skip to content

Commit

Permalink
Simplify the simple-client, remove some PartialEq derives and make so…
Browse files Browse the repository at this point in the history
…me functions return a reference instead of a clone
  • Loading branch information
locka99 committed Apr 19, 2019
1 parent ae55846 commit fd49311
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 103 deletions.
2 changes: 1 addition & 1 deletion client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl Session {
// For each monitored item
let items_to_create = subscription.monitored_items().iter().map(|(_, item)| {
MonitoredItemCreateRequest {
item_to_monitor: item.item_to_monitor(),
item_to_monitor: item.item_to_monitor().clone(),
monitoring_mode: item.monitoring_mode(),
requested_parameters: MonitoringParameters {
client_handle: item.client_handle(),
Expand Down
14 changes: 7 additions & 7 deletions client/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct MonitoredItem {
id: u32,
/// Monitored item's handle. Used internally - not modifiable
client_handle: u32,
// Item to monitor
// The thing that is actually being monitored - the node id, attribute, index, encoding.
item_to_monitor: ReadValueId,
/// Queue size
queue_size: u32,
Expand Down Expand Up @@ -81,14 +81,14 @@ impl MonitoredItem {
self.client_handle
}

pub fn item_to_monitor(&self) -> ReadValueId { self.item_to_monitor.clone() }
pub fn item_to_monitor(&self) -> &ReadValueId { &self.item_to_monitor }

pub fn sampling_interval(&self) -> f64 { self.sampling_interval }

pub fn queue_size(&self) -> u32 { self.queue_size }

pub fn value(&self) -> DataValue {
self.value.clone()
pub fn value(&self) -> &DataValue {
&self.value
}

pub fn monitoring_mode(&self) -> MonitoringMode { self.monitoring_mode }
Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct Subscription {
priority: u8,
/// The change callback will be what is called if any monitored item changes within a cycle.
/// The monitored item is referenced by its id
data_change_callback: Arc<Mutex<OnDataChange + Send + Sync + 'static>>,
data_change_callback: Arc<Mutex<OnDataChange + Send + Sync>>,
/// A map of monitored items associated with the subscription (key = monitored_item_id)
monitored_items: HashMap<u32, MonitoredItem>,
/// A map of client handle to monitored item id
Expand All @@ -156,7 +156,7 @@ pub struct Subscription {
impl Subscription {
/// Creates a new subscription using the supplied parameters and the supplied data change callback.
pub fn new(subscription_id: u32, publishing_interval: f64, lifetime_count: u32, max_keep_alive_count: u32, max_notifications_per_publish: u32,
publishing_enabled: bool, priority: u8, data_change_callback: Arc<Mutex<dyn OnDataChange + Send + Sync + 'static>>)
publishing_enabled: bool, priority: u8, data_change_callback: Arc<Mutex<dyn OnDataChange + Send + Sync>>)
-> Subscription
{
Subscription {
Expand Down Expand Up @@ -189,7 +189,7 @@ impl Subscription {

pub fn priority(&self) -> u8 { self.priority }

pub fn data_change_callback(&self) -> Arc<Mutex<dyn OnDataChange + Send + Sync + 'static>> { self.data_change_callback.clone() }
pub fn data_change_callback(&self) -> Arc<Mutex<dyn OnDataChange + Send + Sync>> { self.data_change_callback.clone() }

pub(crate) fn set_publishing_interval(&mut self, publishing_interval: f64) { self.publishing_interval = publishing_interval; }

Expand Down
69 changes: 35 additions & 34 deletions samples/discovery-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::str::FromStr;

use opcua_client::prelude::*;
use opcua_types::url::is_opc_ua_binary_url;

fn main() {
// Read the argument
Expand All @@ -26,41 +25,43 @@ fn main() {

// The client API has a simple `find_servers` function that connects and returns servers for us.
let mut client = Client::new(ClientConfig::new("DiscoveryClient", "urn:DiscoveryClient"));
let servers = client.find_servers(url);
if let Ok(servers) = servers {
println!("Discovery server responded with {} servers:", servers.len());
for server in &servers {
// Each server is an `ApplicationDescription`
println!("Server : {}", server.application_name);
if let Some(ref discovery_urls) = server.discovery_urls {
for discovery_url in discovery_urls {
println!(" {}", discovery_url);
if is_opc_ua_binary_url(discovery_url.as_ref()) {
// Try to talk with it and get some endpoints
let client_config = ClientConfig::new("discovery-client", "urn:discovery-client");
let client = Client::new(client_config);

// Ask the server associated with the default endpoint for its list of endpoints
match client.get_server_endpoints_from_url(discovery_url.as_ref()) {
Result::Err(status_code) => {
println!(" ERROR: Cannot get endpoints for this server url, error - {}", status_code);
continue;
}
Result::Ok(endpoints) => {
println!(" Server has these endpoints:");
for e in &endpoints {
println!(" {} - {:?} / {:?}", e.endpoint_url, SecurityPolicy::from_str(e.security_policy_uri.as_ref()).unwrap(), e.security_mode);
}
}
}
}
match client.find_servers(url) {
Ok(servers) => {
println!("Discovery server responded with {} servers:", servers.len());
servers.iter().for_each(|server| {
// Each server is an `ApplicationDescription`
println!("Server : {}", server.application_name);
if let Some(ref discovery_urls) = server.discovery_urls {
discovery_urls.iter().for_each(|discovery_url| print_server_endpoints(discovery_url.as_ref()));
} else {
println!(" No discovery urls for this server");
}
} else {
println!(" No discovery urls for this server");
}
});
}
Err(err) => {
println!("ERROR: Cannot find servers on discovery server - check this error - {:?}", err);
}
} else {
println!("ERROR: Cannot find servers on discovery server - check this error - {:?}", servers.unwrap_err());
}
}

fn print_server_endpoints(discovery_url: &str) {
println!(" {}", discovery_url);
if is_opc_ua_binary_url(discovery_url) {
// Try to talk with it and get some endpoints
let client_config = ClientConfig::new("discovery-client", "urn:discovery-client");
let client = Client::new(client_config);

// Ask the server associated with the default endpoint for its list of endpoints
match client.get_server_endpoints_from_url(discovery_url) {
Result::Ok(endpoints) => {
println!(" Server has these endpoints:");
endpoints.iter().for_each(|e| {
println!(" {} - {:?} / {:?}", e.endpoint_url, SecurityPolicy::from_str(e.security_policy_uri.as_ref()).unwrap(), e.security_mode);
});
}
Result::Err(status_code) => {
println!(" ERROR: Cannot get endpoints for this server url, error - {}", status_code);
}
}
}
}
15 changes: 4 additions & 11 deletions samples/mqtt-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,16 @@ fn subscription_loop(session: Arc<RwLock<Session>>, tx: mpsc::Sender<(NodeId, Da
println!("Data change from server:");
let tx = tx.lock().unwrap();
items.iter().for_each(|item| {
let node_id = item.item_to_monitor().node_id;
let value = item.value();
let node_id = item.item_to_monitor().node_id.clone();
let value = item.value().clone();
let _ = tx.send((node_id, value));
});
}))?;
println!("Created a subscription with id = {}", subscription_id);

// Create some monitored items
let read_nodes = vec![
ReadValueId::from(NodeId::new(2, "v1")),
ReadValueId::from(NodeId::new(2, "v2")),
ReadValueId::from(NodeId::new(2, "v3")),
ReadValueId::from(NodeId::new(2, "v4")),
];
let items_to_create: Vec<MonitoredItemCreateRequest> = read_nodes.into_iter().map(|read_node| {
MonitoredItemCreateRequest::new(read_node, MonitoringMode::Reporting, MonitoringParameters::default())
}).collect();
let items_to_create: Vec<MonitoredItemCreateRequest> = ["v1", "v2", "v3", "v4"].iter()
.map(|v| NodeId::new(2, *v).into()).collect();
let _ = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create)?;
}

Expand Down
77 changes: 30 additions & 47 deletions samples/simple-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
//!
//! 1. Read a configuration file (either default or the one specified using --config)
//! 2. Connect & create a session on one of those endpoints that match with its config (you can override which using --endpoint-id arg)
//! 3. Either:
//! a) Read some values and exit
//! b) Subscribe to values and loop forever printing out their values (using --subscribe)
//! 3. Subscribe to values and loop forever printing out their values
use std::sync::{Arc, RwLock};

use clap::{App, Arg};
Expand All @@ -13,17 +11,15 @@ use opcua_client::prelude::*;

fn main() {
// Read command line arguments
let url = {
let m = App::new("Simple OPC UA Client")
.arg(Arg::with_name("url")
.long("url")
.help("Specify the OPC UA endpoint to connect to")
.takes_value(true)
.default_value("opc.tcp://localhost:4855")
.required(false))
.get_matches();
m.value_of("url").unwrap().to_string()
};
let m = App::new("Simple OPC UA Client")
.arg(Arg::with_name("url")
.long("url")
.help("Specify the OPC UA endpoint to connect to")
.takes_value(true)
.default_value("opc.tcp://localhost:4855")
.required(false))
.get_matches();
let url = m.value_of("url").unwrap().to_string();

// Optional - enable OPC UA logging
opcua_console_logging::init();
Expand All @@ -36,48 +32,35 @@ fn main() {
.client().unwrap();

if let Ok(session) = client.connect_to_endpoint((url.as_ref(), SecurityPolicy::None.to_str(), MessageSecurityMode::None, UserTokenPolicy::anonymous()), IdentityToken::Anonymous) {
if let Err(result) = subscription_loop(session) {
println!("ERROR: Got an error while performing action - {}", result);
if let Err(result) = subscribe_to_variables(session.clone()) {
println!("ERROR: Got an error while subscribing to variables - {}", result);
} else {
// Loops forever. The publish thread will call the callback with changes on the variables
let _ = Session::run(session);
}
}
}

fn subscription_loop(session: Arc<RwLock<Session>>) -> Result<(), StatusCode> {
// This scope is important - we don't want the session to be locked when the code hits the
// loop below
{
let mut session = session.write().unwrap();
fn subscribe_to_variables(session: Arc<RwLock<Session>>) -> Result<(), StatusCode> {
let mut session = session.write().unwrap();
// Creates a subscription with a data change callback
let subscription_id = session.create_subscription(2000.0, 10, 30, 0, 0, true, DataChangeCallback::new(|changed_monitored_items| {
println!("Data change from server:");
changed_monitored_items.iter().for_each(|item| print_value(item));
}))?;
println!("Created a subscription with id = {}", subscription_id);

// Creates our subscription
let subscription_id = session.create_subscription(2000.0, 10, 30, 0, 0, true, DataChangeCallback::new(|items| {
println!("Data change from server:");
items.iter().for_each(|item| {
print_value(&item.item_to_monitor(), &item.value());
});
}))?;
println!("Created a subscription with id = {}", subscription_id);

// Create some monitored items
let read_nodes = vec![
ReadValueId::from(NodeId::new(2, "v1")),
ReadValueId::from(NodeId::new(2, "v2")),
ReadValueId::from(NodeId::new(2, "v3")),
ReadValueId::from(NodeId::new(2, "v4")),
];
let items_to_create: Vec<MonitoredItemCreateRequest> = read_nodes.into_iter().map(|read_node| {
MonitoredItemCreateRequest::new(read_node, MonitoringMode::Reporting, MonitoringParameters::default())
}).collect();
let _ = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create)?;
}

// Loops forever. The publish thread will call the callback with changes on the variables
let _ = Session::run(session);
// Create some monitored items
let items_to_create: Vec<MonitoredItemCreateRequest> = ["v1", "v2", "v3", "v4"].iter()
.map(|v| NodeId::new(2, *v).into()).collect();
let _ = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create)?;

Ok(())
}

fn print_value(read_value_id: &ReadValueId, data_value: &DataValue) {
let node_id = read_value_id.node_id.to_string();
fn print_value(item: &MonitoredItem) {
let node_id = &item.item_to_monitor().node_id;
let data_value = item.value();
if let Some(ref value) = data_value.value {
println!("Item \"{}\", Value = {:?}", node_id, value);
} else {
Expand Down
2 changes: 1 addition & 1 deletion samples/web-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl OPCUASession {
// Create some monitored items
let items_to_create: Vec<MonitoredItemCreateRequest> = node_ids.iter().map(|node_id| {
let node_id = NodeId::from_str(node_id).unwrap(); // Trust client to not break this
MonitoredItemCreateRequest::new(node_id.into(), MonitoringMode::Reporting, MonitoringParameters::default())
node_id.into()
}).collect();
let _results = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create);
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl TcpTransport {
/// Start the subscription timer to service subscriptions
fn spawn_subscriptions_task(transport: Arc<RwLock<TcpTransport>>, sender: UnboundedSender<(u32, SupportedMessage)>) {
/// Subscription events are passed sent from the monitor task to the receiver
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug)]
enum SubscriptionEvent {
PublishResponses(VecDeque<PublishResponseEntry>),
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct PublishRequestEntry {
pub results: Option<Vec<StatusCode>>,
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug)]
pub struct PublishResponseEntry {
pub request_id: u32,
pub response: SupportedMessage,
Expand Down
6 changes: 6 additions & 0 deletions types/src/service_types/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,12 @@ impl SignatureData {
}
}

impl Into<MonitoredItemCreateRequest> for NodeId {
fn into(self) -> MonitoredItemCreateRequest {
MonitoredItemCreateRequest::new(self.into(), MonitoringMode::Reporting, MonitoringParameters::default())
}
}

impl MonitoredItemCreateRequest {
/// Adds an item to monitor to the subscription
pub fn new(item_to_monitor: ReadValueId, monitoring_mode: MonitoringMode, requested_parameters: MonitoringParameters) -> MonitoredItemCreateRequest {
Expand Down

0 comments on commit fd49311

Please sign in to comment.