Skip to content

Commit

Permalink
Subscription work, adding openssl as a feature
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Mar 24, 2017
1 parent f65b097 commit 54f7825
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 98 deletions.
11 changes: 0 additions & 11 deletions 3rd-party/3rd-party.iml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,5 @@
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module-library">
<library name="PHP Runtime" type="php">
<CLASSES>
<root url="jar://$APPLICATION_PLUGINS_DIR$/php/lib/php.jar!/stubs/standard" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$APPLICATION_PLUGINS_DIR$/php/lib/php.jar!/stubs/standard" />
</SOURCES>
</library>
</orderEntry>
</component>
</module>
71 changes: 71 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,13 @@ rand = "0.3"
regex = "0.2.1"
lazy_static = "0.2"

# openssl = "0.9.2"
# OpenSSL is optional due to problems building and installing it on some platforms.
openssl = { version = "0.9.9", optional = true }

[features]
# The default package is no crypto.
default = []

# Crypto relies on openssl which is a bit of a pain to use on some platforms. Build without if you have trouble with it
# or simply don't need the functionality.
crypto = ["openssl"]
8 changes: 8 additions & 0 deletions core/src/crypto/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! Contains the wrappers for crypto for OPC UA.
//!
//! This file is an optional component of the stack. If it isn't compiled in, then the OPC UA
//! impl will not support encryption, decryption, signing or verification.
// TODO

use openssl::*;
11 changes: 0 additions & 11 deletions docs/docs.iml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,5 @@
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module-library">
<library name="PHP Runtime" type="php">
<CLASSES>
<root url="jar://$APPLICATION_PLUGINS_DIR$/php/lib/php.jar!/stubs/standard" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$APPLICATION_PLUGINS_DIR$/php/lib/php.jar!/stubs/standard" />
</SOURCES>
</library>
</orderEntry>
</component>
</module>
2 changes: 1 addition & 1 deletion server/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const SEND_BUFFER_SIZE: usize = 1024 * 64;
const MAX_MESSAGE_SIZE: usize = 1024 * 64;

// Rate at which subscriptions are serviced
const SUBSCRIPTION_TIMER_RATE: i64 = 5000;
const SUBSCRIPTION_TIMER_RATE: i64 = 500;

#[derive(Clone, Debug, PartialEq)]
pub enum TransportState {
Expand Down
52 changes: 48 additions & 4 deletions server/src/services/monitored_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,55 @@ impl MonitoredItemService {
Ok(SupportedMessage::CreateMonitoredItemsResponse(response))
}

pub fn modify_monitored_items(&self, _: &mut ServerState, _: &mut SessionState, _: ModifyMonitoredItemsRequest) -> Result<SupportedMessage, &'static StatusCode> {
unimplemented!();
pub fn modify_monitored_items(&self, _: &mut ServerState, session_state: &mut SessionState, request: ModifyMonitoredItemsRequest) -> Result<SupportedMessage, &'static StatusCode> {
let mut service_status = &GOOD;
let results = if let Some(ref items_to_modify) = request.items_to_modify {
// Find subscription and add items to it
let mut subscriptions = session_state.subscriptions.lock().unwrap();
let subscription_id = request.subscription_id;
if let Some(mut subscription) = subscriptions.get_mut(&subscription_id) {
Some(subscription.modify_monitored_items(items_to_modify))
} else {
// No matching subscription
service_status = &BAD_SUBSCRIPTION_ID_INVALID;
None
}
} else {
// No items to modify so nothing to do
service_status = &BAD_NOTHING_TO_DO;
None
};
let response = ModifyMonitoredItemsResponse {
response_header: ResponseHeader::new_service_result(&DateTime::now(), &request.request_header, service_status),
results: results,
diagnostic_infos: None
};
Ok(SupportedMessage::ModifyMonitoredItemsResponse(response))
}

pub fn delete_monitored_items(&self, _: &mut ServerState, _: &mut SessionState, _: DeleteMonitoredItemsRequest) -> Result<SupportedMessage, &'static StatusCode> {
unimplemented!();
pub fn delete_monitored_items(&self, _: &mut ServerState, session_state: &mut SessionState, request: DeleteMonitoredItemsRequest) -> Result<SupportedMessage, &'static StatusCode> {
let mut service_status = &GOOD;
let results = if let Some(ref items_to_delete) = request.monitored_item_ids {
// Find subscription and add items to it
let mut subscriptions = session_state.subscriptions.lock().unwrap();
let subscription_id = request.subscription_id;
if let Some(mut subscription) = subscriptions.get_mut(&subscription_id) {
Some(subscription.delete_monitored_items(items_to_delete))
} else {
// No matching subscription
service_status = &BAD_SUBSCRIPTION_ID_INVALID;
None
}
} else {
// No items to modify so nothing to do
service_status = &BAD_NOTHING_TO_DO;
None
};
let response = DeleteMonitoredItemsResponse {
response_header: ResponseHeader::new_service_result(&DateTime::now(), &request.request_header, service_status),
results: results,
diagnostic_infos: None
};
Ok(SupportedMessage::DeleteMonitoredItemsResponse(response))
}
}
15 changes: 6 additions & 9 deletions server/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl SessionState {
if subscription.state == SubscriptionState::Closed {
dead_subscriptions.push(*subscription_id);
} else {
let (notification_message, publish_request_action) = subscription.tick(address_space, &publish_request, &now);
let (notification_message, publish_request_action, subscription_ack_results) = subscription.tick(address_space, &publish_request, &now);
if let Some(notification_message) = notification_message {
let publish_response = PublishResponse {
response_header: ResponseHeader::new_notification_response(&DateTime::now(), &GOOD),
Expand Down Expand Up @@ -97,12 +97,7 @@ impl SessionState {
self.publish_request_queue.push(publish_request.unwrap());
}


if result.is_empty() {
None
} else {
Some(result)
}
if result.is_empty() { None } else { Some(result) }
}


Expand All @@ -117,10 +112,12 @@ impl SessionState {
} else {
MAX_REQUEST_TIMEOUT
};
let timeout = time::Duration::milliseconds(timeout);
let timeout_d = time::Duration::milliseconds(timeout);

// The request has timed out if the timestamp plus hint exceeds the input time
if timestamp + timeout >= *now {
let expiration_time = timestamp + timeout_d;
if *now >= expiration_time {
debug!("Publish request {} has expired - timestamp = {:?}, expiration hint = {}, expiration time = {:?}, time now = {:?}, ", r.request_header.request_handle, timestamp, timeout, expiration_time, now);
let now = DateTime::from_chrono(now);
let response = PublishResponse {
response_header: ResponseHeader::new_service_result(&now, &r.request_header, &BAD_REQUEST_TIMEOUT),
Expand Down
Loading

0 comments on commit 54f7825

Please sign in to comment.