Skip to content

Commit

Permalink
Move tick_subscriptions to the session object to fix mutability issue…
Browse files Browse the repository at this point in the history
…s and simplify code
  • Loading branch information
locka99 committed Mar 23, 2017
1 parent 8c76c37 commit ddec290
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 69 deletions.
13 changes: 7 additions & 6 deletions server/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use opcua_core::debug::*;
use server::ServerState;
use session::SessionState;
use comms::message_handler::*;
use subscriptions::{Subscription, SubscriptionEvent};
use subscriptions::{SubscriptionEvent};

// TODO these need to go, and use session_state settings
const RECEIVE_BUFFER_SIZE: usize = 1024 * 64;
Expand Down Expand Up @@ -238,18 +238,19 @@ impl TcpTransport {
// Manage subscriptions
debug!("Timer fired");
let mut session_state = session_state.lock().unwrap();
let server_state = server_state.lock().unwrap();
let address_space = server_state.address_space.lock().unwrap();

// Request queue might contain stale publish requests
if let Some(messages) = session_state.expire_stale_publish_requests(&UTC::now()) {
let _ = subscription_timer_tx.send(SubscriptionEvent::Messages(messages));
}

// Process subscriptions
let mut subscriptions = session_state.subscriptions.lock().unwrap();
if let Some(messages) = Subscription::tick_subscriptions(&address_space, &session_state.publish_request_queue, &mut subscriptions) {
let _ = subscription_timer_tx.send(SubscriptionEvent::Messages(messages));
{
let server_state = server_state.lock().unwrap();
let address_space = server_state.address_space.lock().unwrap();
if let Some(messages) = session_state.tick_subscriptions(&address_space) {
let _ = subscription_timer_tx.send(SubscriptionEvent::Messages(messages));
}
}
});
(subscription_timer, subscription_timer_guard, subscription_timer_rx)
Expand Down
65 changes: 64 additions & 1 deletion server/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use time;
use chrono;

use opcua_core::prelude::*;
use prelude::*;

use DateTimeUTC;
use subscriptions::*;
Expand Down Expand Up @@ -43,6 +44,68 @@ impl SessionState {
}
}

/// Iterate all subscriptions calling tick on each. Note this could potentially be done to run in parallel
/// assuming the action to clean dead subscriptions was a join done after all ticks had completed.
pub fn tick_subscriptions(&mut self, address_space: &AddressSpace) -> Option<Vec<SupportedMessage>> {
let mut result = Vec::new();
let now = chrono::UTC::now();

let publish_request = self.publish_request_queue.pop();
let mut dequeue_publish_request = false;

{
let mut subscriptions = self.subscriptions.lock().unwrap();
let mut dead_subscriptions: Vec<u32> = Vec::with_capacity(subscriptions.len());

for (subscription_id, subscription) in subscriptions.iter_mut() {
// Dead subscriptions will be removed at the end
if subscription.state == SubscriptionState::Closed {
dead_subscriptions.push(*subscription_id);
} else {
let (notification_message, publish_request_action) = subscription.tick(address_space, &publish_request, &now);
if let Some(notification_message) = notification_message {
let publish_response = PublishResponse {
response_header: ResponseHeader::new_notification_response(&DateTime::now(), &GOOD),
subscription_id: *subscription_id,
available_sequence_numbers: None,
// TODO
more_notifications: subscription.more_notifications,
notification_message: notification_message,
results: None,
// TODO
diagnostic_infos: None,
};
result.push(SupportedMessage::PublishResponse(publish_response));
}
// Determine if publish request should be dequeued (after processing all subscriptions)
match publish_request_action {
PublishRequestAction::Dequeue => {
dequeue_publish_request = true;
}
_ => {}
}
}
}

// Remove dead subscriptions
for subscription_id in dead_subscriptions {
subscriptions.remove(&subscription_id);
}
}

if publish_request.is_some() && !dequeue_publish_request {
self.publish_request_queue.push(publish_request.unwrap());
}


if result.is_empty() {
None
} else {
Some(result)
}
}


pub fn expire_stale_publish_requests(&mut self, now: &DateTimeUTC) -> Option<Vec<SupportedMessage>> {
let mut expired = Vec::with_capacity(self.max_publish_requests);

Expand Down
63 changes: 1 addition & 62 deletions server/src/subscriptions/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use chrono;
use time;

use opcua_core::types::*;
use opcua_core::services::*;
use opcua_core::comms::*;

use DateTimeUTC;
Expand Down Expand Up @@ -203,69 +202,9 @@ impl Subscription {
result
}

/// Iterate all subscriptions calling tick on each. Note this could potentially be done to run in parallel
/// assuming the action to clean dead subscriptions was a join done after all ticks had completed.
pub fn tick_subscriptions(address_space: &AddressSpace, publish_requests: &Vec<PublishRequest>, subscriptions: &mut HashMap<UInt32, Subscription>) -> Option<Vec<SupportedMessage>> {
let mut dead_subscriptions: Vec<u32> = Vec::with_capacity(5);

let mut result = Vec::new();

// TODO remove this, should modify input
let mut publish_requests = publish_requests.clone();

let mut dequeue_publish_request = false;
let publish_request = publish_requests.pop();

let now = chrono::UTC::now();
for (subscription_id, subscription) in subscriptions.iter_mut() {
// Dead subscriptions will be removed at the end
if subscription.state == SubscriptionState::Closed {
dead_subscriptions.push(*subscription_id);
} else {
let (notification_message, publish_request_action) = subscription.tick(address_space, &publish_request, &now);
if let Some(notification_message) = notification_message {
let publish_response = PublishResponse {
response_header: ResponseHeader::new_notification_response(&DateTime::now(), &GOOD),
subscription_id: *subscription_id,
available_sequence_numbers: None,
// TODO
more_notifications: subscription.more_notifications,
notification_message: notification_message,
results: None,
// TODO
diagnostic_infos: None,
};
result.push(SupportedMessage::PublishResponse(publish_response));
}
// Determine if publish request should be dequeued (after processing all subscriptions)
match publish_request_action {
PublishRequestAction::Dequeue => {
dequeue_publish_request = true;
}
_ => {}
}
}
}

if publish_request.is_some() && !dequeue_publish_request {
publish_requests.push(publish_request.unwrap());
}

// Remove dead subscriptions
for subscription_id in dead_subscriptions {
subscriptions.remove(&subscription_id);
}

if result.is_empty() {
None
} else {
Some(result)
}
}

/// Checks the subscription and monitored items for state change, messages. If the tick does
/// nothing, the function returns None. Otherwise it returns one or more messages in an Vec.
fn tick(&mut self, address_space: &AddressSpace, publish_request: &Option<PublishRequest>, now: &DateTimeUTC) -> (Option<NotificationMessage>, PublishRequestAction) {
pub fn tick(&mut self, address_space: &AddressSpace, publish_request: &Option<PublishRequest>, now: &DateTimeUTC) -> (Option<NotificationMessage>, PublishRequestAction) {
debug!("subscription tick {}", self.subscription_id);

// Test if the interval has elapsed.
Expand Down

0 comments on commit ddec290

Please sign in to comment.