From e07fa163058f621a61df7295d897a1d00c515175 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Wed, 4 Sep 2019 21:23:16 +0100 Subject: [PATCH] Fix integration test framework, terminate tcp streams more reliably --- CHANGELOG.md | 5 +-- client/src/comms/tcp_transport.rs | 4 ++- integration/run.sh | 1 + samples/simple-client/src/main.rs | 2 +- server/src/comms/tcp_transport.rs | 59 ++++++++++++++++++------------- 5 files changed, 43 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6e4978c3..84b7f7639 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,9 +22,10 @@ Planned future work is listed at the bottom. - New `modbus-server` sample server which connects to a MODBUS device and presents values through OPC UA - [Client](docs/client.md) and [Server](docs/server.md) tutorials. - More control over limits on the server - number of subscriptions, monitored items, sessions, min publishing interval + - Integration test framework for some basic client / server scenarios such as connecting / disconnecting + with different security policies. + - TODO Fix leak issue with client disconnects not actually disconnecting all their tasks - TODO web-client demonstrates subscribing to events from demo-server - - TODO fix leak issue with client disconnects not actually disconnecting all their tasks - - TODO Integration tests are broken and need to be fixed. - TODO Session restore after disconnect in server. The server has to stash sessions that were abnormally disconnected so the session state can be restored if a new connection provides the token. - TODO prevent nested arrays from being deserialized diff --git a/client/src/comms/tcp_transport.rs b/client/src/comms/tcp_transport.rs index 30522a3e7..c69bc425c 100644 --- a/client/src/comms/tcp_transport.rs +++ b/client/src/comms/tcp_transport.rs @@ -426,8 +426,10 @@ impl TcpTransport { } if session_status_code.is_bad() { set_connection_state!(connection.state, ConnectionState::Finished(session_status_code)); + Err(std::io::ErrorKind::ConnectionReset.into()) + } else { + Ok(()) } - Ok(()) }).map_err(move |e| { error!("Read loop error {:?}", e); let connection = trace_read_lock_unwrap!(connection_for_error); diff --git a/integration/run.sh b/integration/run.sh index e0ca834ed..4b5da0af5 100644 --- a/integration/run.sh +++ b/integration/run.sh @@ -1 +1,2 @@ +export RUST_OPCUA_LOG=debug cargo test -- --test-threads=1 --ignored diff --git a/samples/simple-client/src/main.rs b/samples/simple-client/src/main.rs index 1bb7f513b..5a49f195c 100644 --- a/samples/simple-client/src/main.rs +++ b/samples/simple-client/src/main.rs @@ -30,7 +30,7 @@ fn main() { .application_uri("urn:SimpleClient") .trust_server_certs(true) .create_sample_keypair(true) - .session_retry_limit(0) + .session_retry_limit(3) .client().unwrap(); if let Ok(session) = client.connect_to_endpoint((url.as_ref(), SecurityPolicy::None.to_str(), MessageSecurityMode::None, UserTokenPolicy::anonymous()), IdentityToken::Anonymous) { diff --git a/server/src/comms/tcp_transport.rs b/server/src/comms/tcp_transport.rs index b62cbd3c9..b4c7845ac 100644 --- a/server/src/comms/tcp_transport.rs +++ b/server/src/comms/tcp_transport.rs @@ -8,28 +8,28 @@ use std; use std::collections::VecDeque; use std::net::SocketAddr; -use std::time::{Instant, Duration}; -use std::sync::{Arc, RwLock, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; use chrono; use chrono::Utc; use futures::{ - Stream, Future, - future, - sync::mpsc::{self, UnboundedSender, UnboundedReceiver, unbounded}, + Future, future, + Stream, + sync::mpsc::{self, unbounded, UnboundedReceiver, UnboundedSender}, }; use tokio::{self, net::TcpStream}; -use tokio_io::{AsyncRead, io::{self, ReadHalf, WriteHalf}}; use tokio_codec::FramedRead; +use tokio_io::{AsyncRead, AsyncWrite, io::{self, ReadHalf, WriteHalf}}; use tokio_timer::Interval; use opcua_core::{ - prelude::*, comms::{ message_writer::MessageWriter, - tcp_codec::{Message, TcpCodec}, secure_channel::SecureChannel, + tcp_codec::{Message, TcpCodec}, }, + prelude::*, }; use opcua_types::{status_code::StatusCode, tcp_types::*}; @@ -38,9 +38,9 @@ use crate::{ comms::secure_channel_service::SecureChannelService, comms::transport::*, constants, - state::ServerState, services::message_handler::MessageHandler, session::Session, + state::ServerState, subscriptions::PublishResponseEntry, subscriptions::subscription::TickReason, }; @@ -219,18 +219,20 @@ impl TcpTransport { (writer, bytes_to_write, transport) }; let connection_for_err = connection.clone(); - io::write_all(writer.unwrap(), bytes_to_write).map_err(move |err| { - error!("Write IO error {:?}", err); - let mut transport = trace_write_lock_unwrap!(transport); - transport.finish(StatusCode::BadCommunicationError); - }).map(move |(writer, _)| { - // Build a new connection state - { - let mut connection = trace_lock_unwrap!(connection); - connection.writer = Some(writer); - } - connection - }).map_err(move |_| { + io::write_all(writer.unwrap(), bytes_to_write) + .map_err(move |err| { + error!("Write IO error {:?}", err); + let mut transport = trace_write_lock_unwrap!(transport); + transport.finish(StatusCode::BadCommunicationError); + }) + .map(move |(writer, _)| { + // Build a new connection state + { + let mut connection = trace_lock_unwrap!(connection); + connection.writer = Some(writer); + } + connection + }).map_err(move |_| { connection_for_err }) } @@ -361,7 +363,13 @@ impl TcpTransport { transport.is_finished() }; if finished { - info!("Writer session status is bad is terminating"); + info!("Writer session status is terminating"); + { + let mut connection = trace_lock_unwrap!(connection); + if let Some(ref mut writer) = connection.writer { + let _ = writer.shutdown(); + } + } Err(connection) } else { Ok(connection) @@ -453,12 +461,15 @@ impl TcpTransport { session_status_code = StatusCode::BadUnexpectedError; } } - // Update the session status + // Update the session status and drop out if session_status_code.is_bad() { let mut transport = trace_write_lock_unwrap!(connection.transport); transport.finish(session_status_code); + Err(std::io::ErrorKind::ConnectionReset.into()) + } + else { + Ok(()) } - Ok(()) }) .map_err(move |err| { // Mark as finished just in case something else didn't