Skip to content

Commit

Permalink
Fix integration test framework, terminate tcp streams more reliably
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Sep 4, 2019
1 parent fdb43a8 commit e07fa16
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 28 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ Planned future work is listed at the bottom.
- New `modbus-server` sample server which connects to a MODBUS device and presents values through OPC UA
- [Client](docs/client.md) and [Server](docs/server.md) tutorials.
- More control over limits on the server - number of subscriptions, monitored items, sessions, min publishing interval
- Integration test framework for some basic client / server scenarios such as connecting / disconnecting
with different security policies.
- TODO Fix leak issue with client disconnects not actually disconnecting all their tasks
- TODO web-client demonstrates subscribing to events from demo-server
- TODO fix leak issue with client disconnects not actually disconnecting all their tasks
- TODO Integration tests are broken and need to be fixed.
- TODO Session restore after disconnect in server. The server has to stash sessions that were
abnormally disconnected so the session state can be restored if a new connection provides the token.
- TODO prevent nested arrays from being deserialized
Expand Down
4 changes: 3 additions & 1 deletion client/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,10 @@ impl TcpTransport {
}
if session_status_code.is_bad() {
set_connection_state!(connection.state, ConnectionState::Finished(session_status_code));
Err(std::io::ErrorKind::ConnectionReset.into())
} else {
Ok(())
}
Ok(())
}).map_err(move |e| {
error!("Read loop error {:?}", e);
let connection = trace_read_lock_unwrap!(connection_for_error);
Expand Down
1 change: 1 addition & 0 deletions integration/run.sh
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export RUST_OPCUA_LOG=debug
cargo test -- --test-threads=1 --ignored
2 changes: 1 addition & 1 deletion samples/simple-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() {
.application_uri("urn:SimpleClient")
.trust_server_certs(true)
.create_sample_keypair(true)
.session_retry_limit(0)
.session_retry_limit(3)
.client().unwrap();

if let Ok(session) = client.connect_to_endpoint((url.as_ref(), SecurityPolicy::None.to_str(), MessageSecurityMode::None, UserTokenPolicy::anonymous()), IdentityToken::Anonymous) {
Expand Down
59 changes: 35 additions & 24 deletions server/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@
use std;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::time::{Instant, Duration};
use std::sync::{Arc, RwLock, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

use chrono;
use chrono::Utc;
use futures::{
Stream, Future,
future,
sync::mpsc::{self, UnboundedSender, UnboundedReceiver, unbounded},
Future, future,
Stream,
sync::mpsc::{self, unbounded, UnboundedReceiver, UnboundedSender},
};
use tokio::{self, net::TcpStream};
use tokio_io::{AsyncRead, io::{self, ReadHalf, WriteHalf}};
use tokio_codec::FramedRead;
use tokio_io::{AsyncRead, AsyncWrite, io::{self, ReadHalf, WriteHalf}};
use tokio_timer::Interval;

use opcua_core::{
prelude::*,
comms::{
message_writer::MessageWriter,
tcp_codec::{Message, TcpCodec},
secure_channel::SecureChannel,
tcp_codec::{Message, TcpCodec},
},
prelude::*,
};
use opcua_types::{status_code::StatusCode, tcp_types::*};

Expand All @@ -38,9 +38,9 @@ use crate::{
comms::secure_channel_service::SecureChannelService,
comms::transport::*,
constants,
state::ServerState,
services::message_handler::MessageHandler,
session::Session,
state::ServerState,
subscriptions::PublishResponseEntry,
subscriptions::subscription::TickReason,
};
Expand Down Expand Up @@ -219,18 +219,20 @@ impl TcpTransport {
(writer, bytes_to_write, transport)
};
let connection_for_err = connection.clone();
io::write_all(writer.unwrap(), bytes_to_write).map_err(move |err| {
error!("Write IO error {:?}", err);
let mut transport = trace_write_lock_unwrap!(transport);
transport.finish(StatusCode::BadCommunicationError);
}).map(move |(writer, _)| {
// Build a new connection state
{
let mut connection = trace_lock_unwrap!(connection);
connection.writer = Some(writer);
}
connection
}).map_err(move |_| {
io::write_all(writer.unwrap(), bytes_to_write)
.map_err(move |err| {
error!("Write IO error {:?}", err);
let mut transport = trace_write_lock_unwrap!(transport);
transport.finish(StatusCode::BadCommunicationError);
})
.map(move |(writer, _)| {
// Build a new connection state
{
let mut connection = trace_lock_unwrap!(connection);
connection.writer = Some(writer);
}
connection
}).map_err(move |_| {
connection_for_err
})
}
Expand Down Expand Up @@ -361,7 +363,13 @@ impl TcpTransport {
transport.is_finished()
};
if finished {
info!("Writer session status is bad is terminating");
info!("Writer session status is terminating");
{
let mut connection = trace_lock_unwrap!(connection);
if let Some(ref mut writer) = connection.writer {
let _ = writer.shutdown();
}
}
Err(connection)
} else {
Ok(connection)
Expand Down Expand Up @@ -453,12 +461,15 @@ impl TcpTransport {
session_status_code = StatusCode::BadUnexpectedError;
}
}
// Update the session status
// Update the session status and drop out
if session_status_code.is_bad() {
let mut transport = trace_write_lock_unwrap!(connection.transport);
transport.finish(session_status_code);
Err(std::io::ErrorKind::ConnectionReset.into())
}
else {
Ok(())
}
Ok(())
})
.map_err(move |err| {
// Mark as finished just in case something else didn't
Expand Down

0 comments on commit e07fa16

Please sign in to comment.