Skip to content

Commit

Permalink
create monitored item wasn't returning an error for non existent node…
Browse files Browse the repository at this point in the history
… ids
  • Loading branch information
locka99 committed Nov 24, 2019
1 parent 78b3f50 commit 6421377
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 32 deletions.
2 changes: 1 addition & 1 deletion client/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl TcpTransport {
// Tell the writer to quit
debug!("Reader is sending a quit to the writer");
if let Err(err) = writer_tx.unbounded_send(message_queue::Message::Quit) {
debug!("Cannot sent quit to writer, error = {:?}", err);
debug!("Cannot send quit to writer, error = {:?}", err);
}
Err(std::io::ErrorKind::ConnectionReset.into())
} else {
Expand Down
2 changes: 1 addition & 1 deletion client/src/message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl MessageQueue {

fn send_message(&mut self, message: Message) {
if let Err(err) = self.sender.as_ref().unwrap().unbounded_send(message) {
debug!("Cannot sent message to message receiver, error = {:?}", err);
debug!("Cannot send message to message receiver, error = {:?}", err);
}
}

Expand Down
20 changes: 14 additions & 6 deletions integration/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ fn connect_basic256sha256_sign() {
#[test]
#[ignore]
fn connect_basic256sha256_sign_and_encrypt() {
// Connect a session with Basic256Sha256 and SignAndEncrypt
connect_with(next_port(), endpoint_basic256sha256_sign_encrypt(), IdentityToken::Anonymous);
}

Expand All @@ -216,17 +215,17 @@ fn connect_basic128rsa15_with_username_password() {
connect_with(next_port(), endpoint_basic128rsa15_sign_encrypt(), client_user_token());
}

/// Connect a session using an invalid username/password token and expect it to fail
#[test]
#[ignore]
fn connect_basic128rsa15_with_invalid_username_password() {
// Connect a session using an invalid username/password token and expect it to fail
connect_with_invalid_active_session(next_port(), endpoint_basic128rsa15_sign_encrypt(), client_invalid_user_token());
}

/// Connect a session using an X509 key and certificate
#[test]
#[ignore]
fn connect_basic128rsa15_with_x509_token() {
// Connect a session using an X509 key and certificate
connect_with(next_port(), endpoint_basic128rsa15_sign_encrypt(), client_x509_token());
}

Expand All @@ -251,8 +250,12 @@ fn subscribe_1000() {
panic!("This shouldn't be called");
})).unwrap();

// Create monitored items

// NOTE: There is a default limit of 1000 items in arrays, so this list will go from 1 to 1000 inclusive

// Create monitored items - the last one does not exist so expect that to fail
let items_to_create = (0..1000)
.map(|i| i + 1) // From v0001 to v1000
.map(|i| (i, stress_node_id(i)))
.map(|(i, node_id)| {
MonitoredItemCreateRequest {
Expand All @@ -275,8 +278,13 @@ fn subscribe_1000() {

let results = session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items_to_create).unwrap();
results.iter().enumerate().for_each(|(i, result)| {
// debug!("Checkout {:?}", result);
assert!(result.status_code.is_good());
if i == 999 {
// Last idx var does not exist so expect it to fail
error!("Checkout {}", result.status_code);
assert!(result.status_code.is_bad());
} else {
assert!(result.status_code.is_good());
}
});

session.disconnect();
Expand Down
52 changes: 28 additions & 24 deletions server/src/subscriptions/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,35 +219,39 @@ impl Subscription {

// Add items to the subscription if they're not already in its
items_to_create.iter().map(|item_to_create| {
// Create a monitored item, if possible
let monitored_item_id = self.next_monitored_item_id;
match MonitoredItem::new(now, monitored_item_id, timestamps_to_return, item_to_create) {
Ok(monitored_item) => {
if max_monitored_items_per_sub == 0 || self.monitored_items.len() <= max_monitored_items_per_sub {
let revised_sampling_interval = monitored_item.sampling_interval();
let revised_queue_size = monitored_item.queue_size() as u32;
// Validate the filter before registering the item
match monitored_item.validate_filter(address_space) {
Ok(filter_result) => {
// Register the item with the subscription
self.monitored_items.insert(monitored_item_id, monitored_item);
self.next_monitored_item_id += 1;
MonitoredItemCreateResult {
status_code: StatusCode::Good,
monitored_item_id,
revised_sampling_interval,
revised_queue_size,
filter_result,
if !address_space.node_exists(&item_to_create.item_to_monitor.node_id) {
Self::monitored_item_create_error(StatusCode::BadNodeIdUnknown)
} else {
// Create a monitored item, if possible
let monitored_item_id = self.next_monitored_item_id;
match MonitoredItem::new(now, monitored_item_id, timestamps_to_return, item_to_create) {
Ok(monitored_item) => {
if max_monitored_items_per_sub == 0 || self.monitored_items.len() <= max_monitored_items_per_sub {
let revised_sampling_interval = monitored_item.sampling_interval();
let revised_queue_size = monitored_item.queue_size() as u32;
// Validate the filter before registering the item
match monitored_item.validate_filter(address_space) {
Ok(filter_result) => {
// Register the item with the subscription
self.monitored_items.insert(monitored_item_id, monitored_item);
self.next_monitored_item_id += 1;
MonitoredItemCreateResult {
status_code: StatusCode::Good,
monitored_item_id,
revised_sampling_interval,
revised_queue_size,
filter_result,
}
}
Err(status_code) => Self::monitored_item_create_error(status_code)
}
Err(status_code) => Self::monitored_item_create_error(status_code)
} else {
// Number of monitored items exceeds limit per sub
Self::monitored_item_create_error(StatusCode::BadTooManyMonitoredItems)
}
} else {
// Number of monitored items exceeds limit per sub
Self::monitored_item_create_error(StatusCode::BadTooManyMonitoredItems)
}
Err(status_code) => Self::monitored_item_create_error(status_code)
}
Err(status_code) => Self::monitored_item_create_error(status_code)
}
}).collect()
}
Expand Down

0 comments on commit 6421377

Please sign in to comment.