Skip to content

Commit

Permalink
Make sure subscriptions are kept alive (locka99#187)
Browse files Browse the repository at this point in the history
Thanks I'll take it
  • Loading branch information
svanharmelen authored Apr 12, 2022
1 parent 9b4d1b2 commit 3455b65
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 4 deletions.
77 changes: 75 additions & 2 deletions client/src/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,24 @@ impl Session {
Role::Client,
decoding_options,
)));

let message_queue = Arc::new(RwLock::new(MessageQueue::new()));

let subscription_state = Arc::new(RwLock::new(SubscriptionState::new()));

let session_state = Arc::new(RwLock::new(SessionState::new(
ignore_clock_skew,
secure_channel.clone(),
subscription_state.clone(),
message_queue.clone(),
)));

let transport = TcpTransport::new(
secure_channel.clone(),
session_state.clone(),
message_queue.clone(),
single_threaded_executor,
);
let subscription_state = Arc::new(RwLock::new(SubscriptionState::new()));

// This runtime is single threaded. The one for the transport may be multi-threaded
let runtime = tokio::runtime::Builder::new_current_thread()
Expand Down Expand Up @@ -232,6 +237,7 @@ impl Session {
self.session_state = Arc::new(RwLock::new(SessionState::new(
self.ignore_clock_skew,
self.secure_channel.clone(),
self.subscription_state.clone(),
self.message_queue.clone(),
)));

Expand Down Expand Up @@ -854,6 +860,69 @@ impl Session {
});
}

/// Start a task that will periodically send a publish request to keep the subscriptions alive.
/// The request rate will be 3/4 of the shortest (revised publishing interval * the revised keep
/// alive count) of all subscriptions that belong to a single session.
fn spawn_subscription_activity_task(&self) {
session_debug!(self, "spawn_subscription_activity_task",);

let connection_state = {
let session_state = trace_read_lock!(self.session_state);
session_state.connection_state()
};

const MIN_SUBSCRIPTION_ACTIVITY_MS: u64 = 1000;
let session_state = self.session_state.clone();
let subscription_state = self.subscription_state.clone();

let id = format!("subscription-activity-thread-{:?}", thread::current().id());
let runtime = trace_lock!(self.runtime);
runtime.spawn(async move {
register_runtime_component!(&id);

// The timer runs at a higher frequency timer loop to terminate as soon after the session
// state has terminated. Each time it runs it will test if the interval has elapsed or not.
let mut timer = interval(Duration::from_millis(MIN_SUBSCRIPTION_ACTIVITY_MS));

let mut last_timeout: Instant;
let mut subscription_activity_interval: Duration;

loop {
timer.tick().await;

if connection_state.is_finished() {
info!("Session activity timer is terminating");
break;
}

if let (Some(keep_alive_timeout), last_publish_request) = {
let subscription_state = trace_read_lock!(subscription_state);
(
subscription_state.keep_alive_timeout(),
subscription_state.last_publish_request(),
)
} {
subscription_activity_interval =
Duration::from_millis((keep_alive_timeout / 4) * 3);
last_timeout = last_publish_request;

// Get the time now
let now = Instant::now();

// Calculate to interval since last check
let interval = now - last_timeout;
if interval > subscription_activity_interval {
let mut session_state = trace_write_lock!(session_state);
let _ = session_state.async_publish();
}
}
}

info!("Subscription activity timer task is finished");
deregister_runtime_component!(&id);
});
}

/// This is the internal handler for create subscription that receives the callback wrapped up and reference counted.
fn create_subscription_inner(
&self,
Expand Down Expand Up @@ -1251,7 +1320,10 @@ impl Session {
// Turn off publish requests until server says otherwise
debug!("Server tells us too many publish requests so waiting for a response before resuming");
}
StatusCode::BadSessionClosed | StatusCode::BadSessionIdInvalid => {
StatusCode::BadSessionClosed
| StatusCode::BadSessionIdInvalid
| StatusCode::BadNoSubscription
| StatusCode::BadSubscriptionIdInvalid => {
let mut session_state = trace_write_lock!(self.session_state);
session_state.on_session_closed(service_result)
}
Expand Down Expand Up @@ -1500,6 +1572,7 @@ impl SessionService for Session {
response.revised_session_timeout
);
self.spawn_session_activity_task(response.revised_session_timeout);
self.spawn_subscription_activity_task();

// TODO Verify signature using server's public key (from endpoint) comparing with data made from client certificate and nonce.
// crypto::verify_signature_data(verification_key, security_policy, server_certificate, client_certificate, client_nonce);
Expand Down
14 changes: 13 additions & 1 deletion client/src/session/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use opcua_core::{
};
use opcua_crypto::SecurityPolicy;
use opcua_types::{status_code::StatusCode, *};
use tokio::time::Instant;

use crate::{
callbacks::{OnConnectionStatusChange, OnSessionClosed},
message_queue::MessageQueue,
subscription_state::SubscriptionState,
};

#[derive(Copy, Clone, PartialEq, Debug)]
Expand Down Expand Up @@ -128,6 +130,8 @@ pub(crate) struct SessionState {
monitored_item_handle: Handle,
/// Subscription acknowledgements pending for send
subscription_acknowledgements: Vec<SubscriptionAcknowledgement>,
/// Subscription state
subscription_state: Arc<RwLock<SubscriptionState>>,
/// The message queue
message_queue: Arc<RwLock<MessageQueue>>,
/// Connection closed callback
Expand Down Expand Up @@ -164,6 +168,7 @@ impl SessionState {
pub fn new(
ignore_clock_skew: bool,
secure_channel: Arc<RwLock<SecureChannel>>,
subscription_state: Arc<RwLock<SubscriptionState>>,
message_queue: Arc<RwLock<MessageQueue>>,
) -> SessionState {
let id = NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed);
Expand All @@ -182,8 +187,9 @@ impl SessionState {
session_id: NodeId::null(),
authentication_token: NodeId::null(),
monitored_item_handle: Handle::new(Self::FIRST_MONITORED_ITEM_HANDLE),
message_queue,
subscription_acknowledgements: Vec::new(),
subscription_state,
message_queue,
session_closed_callback: None,
connection_status_callback: None,
}
Expand Down Expand Up @@ -301,6 +307,12 @@ impl SessionState {
subscription_acknowledgements,
};
let request_handle = self.async_send_request(request, None)?;

{
let mut subscription_state = trace_write_lock!(self.subscription_state);
subscription_state.set_last_publish_request(Instant::now());
}

debug!("async_publish, request sent with handle {}", request_handle);
Ok(request_handle)
}
Expand Down
33 changes: 32 additions & 1 deletion client/src/subscription_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,25 @@
use std::collections::HashMap;

use opcua_types::service_types::{DataChangeNotification, EventNotificationList};
use tokio::time::Instant;

use crate::subscription::*;

/// Holds the live subscription state
pub struct SubscriptionState {
/// Subscripion keep alive timeout
keep_alive_timeout: Option<u64>,
/// Timestamp of last pushish request
last_publish_request: Instant,
/// Subscriptions (key = subscription_id)
subscriptions: HashMap<u32, Subscription>,
}

impl SubscriptionState {
pub fn new() -> SubscriptionState {
SubscriptionState {
keep_alive_timeout: None,
last_publish_request: Instant::now(),
subscriptions: HashMap::new(),
}
}
Expand All @@ -40,6 +47,7 @@ impl SubscriptionState {
pub(crate) fn add_subscription(&mut self, subscription: Subscription) {
self.subscriptions
.insert(subscription.subscription_id(), subscription);
self.set_keep_alive_timeout();
}

pub(crate) fn modify_subscription(
Expand All @@ -57,11 +65,14 @@ impl SubscriptionState {
subscription.set_max_keep_alive_count(max_keep_alive_count);
subscription.set_max_notifications_per_publish(max_notifications_per_publish);
subscription.set_priority(priority);
self.set_keep_alive_timeout();
}
}

pub(crate) fn delete_subscription(&mut self, subscription_id: u32) -> Option<Subscription> {
self.subscriptions.remove(&subscription_id)
let subscription = self.subscriptions.remove(&subscription_id);
self.set_keep_alive_timeout();
subscription
}

pub(crate) fn set_publishing_mode(
Expand Down Expand Up @@ -129,4 +140,24 @@ impl SubscriptionState {
subscription.set_triggering(triggering_item_id, links_to_add, links_to_remove);
}
}

pub(crate) fn last_publish_request(&self) -> Instant {
self.last_publish_request
}

pub(crate) fn set_last_publish_request(&mut self, now: Instant) {
self.last_publish_request = now;
}

pub(crate) fn keep_alive_timeout(&self) -> Option<u64> {
self.keep_alive_timeout
}

fn set_keep_alive_timeout(&mut self) {
self.keep_alive_timeout = self
.subscriptions
.values()
.map(|v| (v.publishing_interval() * v.lifetime_count() as f64).floor() as u64)
.min()
}
}

0 comments on commit 3455b65

Please sign in to comment.