Skip to content

Commit

Permalink
Support for DTLS and other transport layers, make client async
Browse files Browse the repository at this point in the history
features:
- Client now uses async and tokio
- Observe now takes ownership of the client to avoid missed packets
- DTLS is supported using webrtc-rs
- DTLS is a feature, can be turned off if needed
- Other backends for transport can be provided by users. They must be
  able to provide a SockAddr for use with observe, butthis could be
  theoretically hardcoded if you want to use some non-ip protocol
- Server is reworked to support multiple listeners at the same time
- Server should no longer block on a single long async request handlers
- Added tests for DTLS using PSK and PKI
- Added example for DTLS using PSK
- Moved all tests to use `tokio::test`
  • Loading branch information
Jose Ignacio Biehl authored and Jose Ignacio Biehl committed Jan 5, 2024
1 parent 773aacf commit 3c82e50
Show file tree
Hide file tree
Showing 19 changed files with 1,661 additions and 872 deletions.
17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ bytes = "^1.1"
coap-lite = "0.11.3"
lru_time_cache = "0.11.11"
mio = "=0.8.8" # fix windows broken, remove it after mio updated
async-trait = "0.1.74"

# dependencies for dtls
webrtc-dtls = {version = "0.8.0", optional = true}
webrtc-util = {version = "0.8.0", optional = true}
rustls = {version = "^0.21.1", optional = true}
rustls-pemfile = {version = "2.0.0", optional = true}
rcgen = {version = "^0.11.0", optional = true}
pkcs8 = {version = "0.10.2", optional = true}
sec1 = { version = "0.7.3", features = ["pem", "pkcs8", "std"], optional = true}

[features]
default = ["dtls"]
dtls = ["dep:webrtc-dtls", "dep:webrtc-util", "dep:rustls", "dep:rustls-pemfile", "dep:rcgen", "dep:pkcs8", "dep:sec1"]


[dev-dependencies]
quickcheck = "0.8.2"


72 changes: 40 additions & 32 deletions benches/server.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,47 @@
#![feature(test, async_closure)]

extern crate test;

use coap::{CoAPClient, Server};
use std::net::SocketAddr;

use coap::{server::UdpCoapListener, Server, UDPCoAPClient};
use coap_lite::{CoapOption, CoapRequest, MessageType};
use std::{sync::mpsc, thread};
use tokio::runtime::Runtime;
use tokio::{net::UdpSocket, runtime::Runtime};

#[bench]
fn bench_server_with_request(b: &mut test::Bencher) {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
Runtime::new().unwrap().block_on(async move {
let mut server = Server::new("127.0.0.1:0").unwrap();

tx.send(server.socket_addr().unwrap().port()).unwrap();

server
.run(async move |request| {
let uri_path = request.get_path().to_string();

return match request.response {
Some(mut response) => {
response.message.payload = uri_path.as_bytes().to_vec();
Some(response)
}
_ => None,
};
})
.await
.unwrap();
});
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

let rt = Runtime::new().unwrap();
rt.spawn(async move {
let sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = sock.local_addr().unwrap();
let listener = Box::new(UdpCoapListener::from_socket(sock));
let server = Server::from_listeners(vec![listener]);

tx.send(addr.port()).unwrap();

server
.run(move |mut request: Box<CoapRequest<SocketAddr>>| async {
let uri_path = request.get_path().to_string();

match request.response {
Some(ref mut response) => {
response.message.payload = uri_path.as_bytes().to_vec();
}
_ => {}
};
return request;
})
.await
.unwrap();
});

let server_port = rx.recv().unwrap();
let client = CoAPClient::new(format!("127.0.0.1:{}", server_port)).unwrap();
let server_port = rx.blocking_recv().unwrap();
let client = rt.block_on(async {
UDPCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
.await
.unwrap()
});

let mut request = CoapRequest::new();
request.message.header.set_version(1);
Expand All @@ -48,8 +54,10 @@ fn bench_server_with_request(b: &mut test::Bencher) {
.add_option(CoapOption::UriPath, "test".to_string().into_bytes());

b.iter(|| {
client.send(&request).unwrap();
let recv_packet = client.receive().unwrap();
assert_eq!(recv_packet.message.payload, b"test".to_vec());
rt.block_on(async {
client.send(&request).await.unwrap();
let recv_packet = client.receive().await.unwrap();
assert_eq!(recv_packet.message.payload, b"test".to_vec());
});
});
}
36 changes: 19 additions & 17 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
extern crate coap;

use coap::CoAPClient;
use coap::UDPCoAPClient;
use std::io;
use std::io::ErrorKind;

fn main() {
#[tokio::main]
async fn main() {
println!("GET url:");
example_get();
example_get().await;

println!("POST data to url:");
example_post();
example_post().await;

println!("PUT data to url:");
example_put();
example_put().await;

println!("DELETE url:");
example_delete();
example_delete().await;

println!("Observing:");
example_observe();
example_observe().await;
}

fn example_get() {
async fn example_get() {
let url = "coap://127.0.0.1:5683/hello/get";
println!("Client request: {}", url);

match CoAPClient::get(url) {
match UDPCoAPClient::get(url).await {
Ok(response) => {
println!(
"Server reply: {}",
Expand All @@ -42,12 +43,12 @@ fn example_get() {
}
}

fn example_post() {
async fn example_post() {
let url = "coap://127.0.0.1:5683/hello/post";
let data = b"data".to_vec();
println!("Client request: {}", url);

match CoAPClient::post(url, data) {
match UDPCoAPClient::post(url, data).await {
Ok(response) => {
println!(
"Server reply: {}",
Expand All @@ -64,12 +65,12 @@ fn example_post() {
}
}

fn example_put() {
async fn example_put() {
let url = "coap://127.0.0.1:5683/hello/put";
let data = b"data".to_vec();
println!("Client request: {}", url);

match CoAPClient::put(url, data) {
match UDPCoAPClient::put(url, data).await {
Ok(response) => {
println!(
"Server reply: {}",
Expand All @@ -86,11 +87,11 @@ fn example_put() {
}
}

fn example_delete() {
async fn example_delete() {
let url = "coap://127.0.0.1:5683/hello/delete";
println!("Client request: {}", url);

match CoAPClient::delete(url) {
match UDPCoAPClient::delete(url).await {
Ok(response) => {
println!(
"Server reply: {}",
Expand All @@ -107,15 +108,16 @@ fn example_delete() {
}
}

fn example_observe() {
let mut client = CoAPClient::new("127.0.0.1:5683").unwrap();
async fn example_observe() {
let client = UDPCoAPClient::new_udp("127.0.0.1:5683").await.unwrap();
client
.observe("/hello/put", |msg| {
println!(
"resource changed {}",
String::from_utf8(msg.payload).unwrap()
);
})
.await
.unwrap();

println!("Press any key to stop...");
Expand Down
84 changes: 84 additions & 0 deletions examples/dtls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/// This example shows how to use DTLS with coap-rs. If you want to use PKI, please take
/// a look at the test in dtls.rs
extern crate coap;
use coap::client::CoAPClient;
use coap::dtls::DTLSConfig;
use coap::Server;
use coap_lite::{CoapRequest, RequestType as Method};
use std::future::Future;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use webrtc_dtls::cipher_suite::CipherSuiteId;
use webrtc_dtls::config::Config;
use webrtc_dtls::listener::listen;
use webrtc_util::conn::Listener as WebRtcListener;

pub fn spawn_dtls_server<
F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
HandlerRet,
>(
ip: &'static str,
request_handler: F,
config: webrtc_dtls::config::Config,
) -> mpsc::UnboundedReceiver<u16>
where
HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
{
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
let listener = listen(ip, config).await.unwrap();
let listen_port = listener.addr().await.unwrap().port();
let listener = Box::new(listener);
let server = Server::from_listeners(vec![listener]);
tx.send(listen_port).unwrap();
server.run(request_handler).await.unwrap();
});

rx
}

#[tokio::main]
async fn main() {
let config = Config {
psk: Some(Arc::new(|_| Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]))),
psk_identity_hint: Some("webrtc-rs DTLS Server".as_bytes().to_vec()),
cipher_suites: vec![CipherSuiteId::Tls_Psk_With_Aes_128_Ccm_8],
server_name: "localhost".to_string(),
..Default::default()
};
let server_port = spawn_dtls_server(
"127.0.0.1:0",
|mut req| async {
req.response.as_mut().unwrap().message.payload = b"hello dtls".to_vec();
req
},
config.clone(),
)
.recv()
.await
.unwrap();

let dtls_config = DTLSConfig {
config,
dest_addr: ("127.0.0.1", server_port)
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
};

let mut client = CoAPClient::from_dtls_config(dtls_config)
.await
.expect("could not create client");
let domain = format!("127.0.0.1:{}", server_port);
let resp = client
.request_path("/hello", Method::Get, None, None, Some(domain.to_string()))
.await
.unwrap();
println!(
"receive on client: {}",
std::str::from_utf8(&resp.message.payload).unwrap()
);
assert_eq!(resp.message.payload, b"hello dtls".to_vec());
}
44 changes: 21 additions & 23 deletions examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
extern crate coap;

use coap::{CoAPClient, Server};
use std::thread;
use tokio::runtime::Runtime;
use std::net::SocketAddr;

fn main() {
thread::spawn(move || {
Runtime::new().unwrap().block_on(async move {
let mut server = Server::new("127.0.0.1:5683").unwrap();
use coap::{Server, UDPCoAPClient};
use coap_lite::CoapRequest;
#[tokio::main]
async fn main() {
let _server_task = tokio::spawn(async move {
let server = Server::new_udp("127.0.0.1:5683").unwrap();
server
.run(|mut request: Box<CoapRequest<SocketAddr>>| async {
let uri_path = request.get_path().to_string();

server
.run(|request| async {
let uri_path = request.get_path().to_string();

return match request.response {
Some(mut response) => {
response.message.payload = uri_path.as_bytes().to_vec();
Some(response)
}
_ => None,
};
})
.await
.unwrap();
});
match request.response {
Some(ref mut response) => {
response.message.payload = uri_path.as_bytes().to_vec();
}
_ => {}
};
return request;
})
.await
.unwrap();
});

let url = "coap://127.0.0.1:5683/Rust";
println!("Client request: {}", url);

// Maybe need sleep seconds before start client on some OS: https://github.com/Covertness/coap-rs/issues/75
let response = CoAPClient::get(url).unwrap();
let response = UDPCoAPClient::get(url).await.unwrap();
println!(
"Server reply: {}",
String::from_utf8(response.message.payload).unwrap()
Expand Down
Loading

0 comments on commit 3c82e50

Please sign in to comment.