From 759258b6bc253e8695f468dc2472187a7e96b86d Mon Sep 17 00:00:00 2001 From: Michiel De Witte Date: Thu, 20 Jun 2024 23:22:20 +0200 Subject: [PATCH] refactor with renaming coap-lite Packet to Message --- src/client.rs | 117 ++++++++++++++++++++++++-------------------------- src/dtls.rs | 11 +++-- src/server.rs | 4 +- 3 files changed, 64 insertions(+), 68 deletions(-) diff --git a/src/client.rs b/src/client.rs index 84898c940..23d61a953 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,7 +6,7 @@ use alloc::vec::Vec; use coap_lite::{ block_handler::{extending_splice, BlockValue}, error::HandlingError, - CoapOption, CoapRequest, CoapResponse, MessageClass, MessageType, ObserveOption, Packet, + CoapOption, CoapRequest, CoapResponse, MessageClass, MessageType, ObserveOption, Packet as Message, RequestType as Method, ResponseType as Status, }; use core::mem; @@ -38,9 +38,9 @@ use url::Url; const DEFAULT_RECEIVE_TIMEOUT_SECONDS: u64 = 2; // 2s #[derive(Debug, Clone)] -pub struct AddressedPacket { - pub address: SocketAddr, - pub packet: Packet, +pub struct Packet { + pub address: Option, + pub message: Message, } #[derive(Debug)] @@ -55,21 +55,20 @@ use async_trait::async_trait; /// timeouts and retries do not need to be implemented by the transport /// if confirmable messages are sent pub trait ClientTransport: Send + Sync { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)>; + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)>; async fn send(&self, buf: &[u8]) -> std::io::Result; } trait TransportExt { - async fn receive_packet(&self) -> IoResult>; + async fn receive_packet(&self) -> IoResult>; } impl TransportExt for T { - async fn receive_packet(&self) -> IoResult> { + async fn receive_packet(&self) -> IoResult> { let mut buf = [0; 1500]; let (nread, address) = self.recv(&mut buf).await?; - // let parse_opt = Packet::from_bytes(&buf[..nread]).ok(); - return match Packet::from_bytes(&buf[..nread]).ok() { - Some(packet) => Ok(Some(AddressedPacket {address, packet})), + return match Message::from_bytes(&buf[..nread]).ok() { + Some(message) => Ok(Some(Packet {address, message})), None => Ok(None), } } @@ -77,7 +76,7 @@ impl TransportExt for T { /// we only use the token as the identifier, and an empty token to represent empty requests type Token = Vec; -type PacketRegistry = BTreeMap>>; +type PacketRegistry = BTreeMap>>; #[derive(Clone)] pub struct TransportSynchronizer { @@ -93,7 +92,7 @@ impl TransportSynchronizer { } } - async fn check_for_error(&self, sender: &UnboundedSender>) -> Option<()> { + async fn check_for_error(&self, sender: &UnboundedSender>) -> Option<()> { self.fail_error.read().await.as_ref(); if let Some(err) = self.fail_error.read().await.as_ref() { let _ = sender.send(Err(Self::clone_err(err))); @@ -120,7 +119,7 @@ impl TransportSynchronizer { } } - pub async fn get_sender(&self, key: &[u8]) -> Option>> { + pub async fn get_sender(&self, key: &[u8]) -> Option>> { self.outgoing .lock() .await @@ -132,12 +131,12 @@ impl TransportSynchronizer { pub async fn set_sender( &self, key: Vec, - sender: UnboundedSender>, - ) -> Option>> { + sender: UnboundedSender>, + ) -> Option>> { self.check_for_error(&sender).await?; self.outgoing.lock().await.insert(key, sender) } - pub async fn remove_sender(&self, key: &[u8]) -> Option>> { + pub async fn remove_sender(&self, key: &[u8]) -> Option>> { self.outgoing.lock().await.remove(key) } } @@ -172,16 +171,16 @@ async fn receive_loop( transport_instance.send(&ack).await?; } - let MessageClass::Response(_) = packet.packet.header.code else { + let MessageClass::Response(_) = packet.message.header.code else { continue; }; - let token = packet.packet.get_token(); + let token = packet.message.get_token(); let Some(sender) = transport_sync.get_sender(token).await else { info!("received unexpected response for token {:?}", &token); continue; }; - match packet.packet.header.code { + match packet.message.header.code { MessageClass::Response(_) => {} m => { debug!("unknown message type {}", m); @@ -199,17 +198,17 @@ async fn receive_loop( return e; } -pub fn parse_for_ack(packet: &AddressedPacket) -> Option> { - match (packet.packet.header.get_type(), packet.packet.header.code) { +pub fn parse_for_ack(packet: &Packet) -> Option> { + match (packet.message.header.get_type(), packet.message.header.code) { (MessageType::Confirmable, MessageClass::Response(_)) => Some(make_ack(packet)), _ => None, } } -pub fn make_ack(packet: &AddressedPacket) -> Vec { - let mut ack = Packet::new(); +pub fn make_ack(packet: &Packet) -> Vec { + let mut ack = Message::new(); ack.header.set_type(MessageType::Acknowledgement); - ack.header.message_id = packet.packet.header.message_id; + ack.header.message_id = packet.message.header.message_id; ack.header.code = MessageClass::Empty; return ack.to_bytes().unwrap(); } @@ -235,9 +234,9 @@ impl Clone for CoapClientTransport { impl CoapClientTransport { pub const DEFAULT_NUM_RETRIES: usize = 5; - async fn establish_receiver_for(&self, msg: &Packet) -> UnboundedReceiver> { + async fn establish_receiver_for(&self, packet: &Packet) -> UnboundedReceiver> { let (tx, rx) = unbounded_channel(); - let token = msg.get_token().to_owned(); + let token = packet.message.get_token().to_owned(); self.synchronizer.set_sender(token, tx).await; return rx; } @@ -246,8 +245,8 @@ impl CoapClientTransport { async fn try_send_confirmable_message( &self, msg: &Packet, - receiver: &mut UnboundedReceiver>, - ) -> IoResult { + receiver: &mut UnboundedReceiver>, + ) -> IoResult { let mut res = Err(Error::new(ErrorKind::InvalidData, "not enough retries")); for _ in 0..self.retries { res = self.try_send_non_confirmable_message(&msg, receiver).await; @@ -258,8 +257,8 @@ impl CoapClientTransport { return res; } - fn encode_packet(packet: &Packet) -> IoResult> { - packet + fn encode_message(message: &Message) -> IoResult> { + message .to_bytes() .map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e.to_string())) } @@ -267,11 +266,11 @@ impl CoapClientTransport { async fn try_send_non_confirmable_message( &self, msg: &Packet, - receiver: &mut UnboundedReceiver>, - ) -> IoResult { - let bytes = Self::encode_packet(msg)?; + receiver: &mut UnboundedReceiver>, + ) -> IoResult { + let bytes = Self::encode_message(&msg.message)?; self.transport.send(&bytes).await?; - let try_receive: Result>, tokio::time::error::Elapsed> = + let try_receive: Result>, tokio::time::error::Elapsed> = timeout(self.timeout, receiver.recv()).await; if let Ok(Some(res)) = try_receive { return res; @@ -282,9 +281,9 @@ impl CoapClientTransport { async fn do_request_response_for_packet_inner( &self, packet: &Packet, - receiver: &mut UnboundedReceiver>, - ) -> IoResult { - if packet.header.get_type() == MessageType::Confirmable { + receiver: &mut UnboundedReceiver>, + ) -> IoResult { + if packet.message.header.get_type() == MessageType::Confirmable { return self.try_send_confirmable_message(&packet, receiver).await; } else { return self @@ -298,11 +297,8 @@ impl CoapClientTransport { let result = self .do_request_response_for_packet_inner(packet, &mut receiver) .await; - self.synchronizer.remove_sender(packet.get_token()).await; - match result { - Ok(addr_packet) => Ok(addr_packet.packet), - Err(err) => Err(err), - } + self.synchronizer.remove_sender(packet.message.get_token()).await; + result } pub fn from_transport(transport: Arc, synchronizer: TransportSynchronizer) -> Self { @@ -321,10 +317,11 @@ pub struct UdpTransport { } #[async_trait] impl ClientTransport for UdpTransport { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> { - self.socket + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)> { + let (read, addr) = self.socket .recv_from(buf) - .await + .await?; + return Ok((read, Some(addr))); } async fn send(&self, buf: &[u8]) -> std::io::Result { self.socket.send_to(buf, self.peer_addr).await @@ -353,12 +350,12 @@ impl Clone for CoAPClient { /// a receiver used whenever you have a use case involving multiple responses to a single request pub struct MessageReceiver { synchronizer: TransportSynchronizer, - receiver: UnboundedReceiver>, + receiver: UnboundedReceiver>, token: Vec, } impl MessageReceiver { - pub async fn receive(&mut self) -> IoResult { + pub async fn receive(&mut self) -> IoResult { match self.receiver.recv().await { Some(Ok(packet)) => Ok(packet), Some(Err(e)) => Err(e), @@ -370,7 +367,7 @@ impl MessageReceiver { } pub fn new( synchronizer: TransportSynchronizer, - receiver: UnboundedReceiver>, + receiver: UnboundedReceiver>, token: &[u8], ) -> Self { Self { @@ -492,7 +489,7 @@ impl UdpCoAPClient { /// client.send_all_coap(&request, segment).await.unwrap(); /// loop { /// let recv_packet = receiver.receive().await.unwrap(); - /// assert_eq!(recv_packet.packet.payload, b"test-echo".to_vec()); + /// assert_eq!(recv_packet.message.payload, b"test-echo".to_vec()); /// } /// } /// ``` @@ -630,7 +627,7 @@ impl CoAPClient { self.receive(&mut request).await } - pub async fn observe( + pub async fn observe( &self, resource_path: &str, handler: H, @@ -645,7 +642,7 @@ impl CoAPClient { /// Observe a resource with the handler and specified timeout using the given transport. /// Use the oneshot sender to cancel observation. If this sender is dropped without explicitly /// cancelling it, the observation will continue forever. - pub async fn observe_with_timeout( + pub async fn observe_with_timeout( &mut self, resource_path: &str, handler: H, @@ -662,7 +659,7 @@ impl CoAPClient { /// Use this method if you need to set some specific options in your /// requests. This method will add observe flags and a message id as a fallback /// Use this method if you plan on re-using the same client for requests - pub async fn observe_with( + pub async fn observe_with( &self, request: CoapRequest, mut handler: H, @@ -734,17 +731,17 @@ impl CoAPClient { let _ = self .transport - .do_request_response_for_packet(&deregister_packet.message) + .do_request_response_for_packet(&Packet {address:None, message: deregister_packet.message}) .await; } - async fn receive_and_handle_message_observe( - socket_result: IoResult, + async fn receive_and_handle_message_observe( + socket_result: IoResult, handler: &mut H, ) { match socket_result { Ok(packet) => { - handler(packet.packet); + handler(packet.message); } Err(e) => match e.kind() { ErrorKind::WouldBlock => { @@ -766,9 +763,9 @@ impl CoAPClient { ) -> IoResult { let response = self .transport - .do_request_response_for_packet(&request.message) + .do_request_response_for_packet(&Packet {address:None, message:request.message.to_owned()}) .await?; - Ok(CoapResponse { message: response }) + Ok(CoapResponse { message: response.message }) } /// low-level method to send a a request supporting block1 option based on @@ -1225,7 +1222,7 @@ mod test { #[async_trait] impl ClientTransport for FaultyUdp { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> { + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)> { self.udp.recv(buf).await } @@ -1392,7 +1389,7 @@ mod test { } #[async_trait] impl ClientTransport for FaultyReceiver { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> { + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)> { let mut mutex = self.should_fail.lock().await; tokio::select! { e = mutex.deref_mut() => { diff --git a/src/dtls.rs b/src/dtls.rs index de8f2e195..43282f0fe 100644 --- a/src/dtls.rs +++ b/src/dtls.rs @@ -3,7 +3,7 @@ use crate::client::ClientTransport; use crate::server::{Listener, Responder, TransportRequestSender}; use async_trait::async_trait; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::SocketAddr; use std::time::Duration; use std::{ io::{Error, ErrorKind, Result as IoResult}, @@ -42,14 +42,13 @@ pub struct DtlsResponse { #[async_trait] impl ClientTransport for DtlsConnection { - async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> { + async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, Option)> { let read = self .conn .read(buf, None) .await .map_err(|e| Error::new(ErrorKind::Other, e))?; - - return Ok((read, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0))); + return Ok((read, self.conn.remote_addr())); } async fn send(&self, buf: &[u8]) -> IoResult { @@ -188,7 +187,7 @@ mod test { use rcgen::KeyPair; use std::fs::File; use std::io::{BufReader, Read}; - use std::net::{SocketAddr, ToSocketAddrs}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}; use std::sync::atomic::AtomicBool; use tokio::sync::mpsc; use tokio::time::sleep; @@ -636,7 +635,7 @@ mod test { todo!("not needed"); } fn remote_addr(&self) -> Option { - todo!("not needed") + Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)) } async fn close(&self) -> WebrtcResult<()> { Ok(self.0.close().await?) diff --git a/src/server.rs b/src/server.rs index 3d167515c..38ad2fd34 100644 --- a/src/server.rs +++ b/src/server.rs @@ -817,7 +817,7 @@ pub mod test { let mut receiver = client.create_receiver_for(&request).await; client.send_all_coap(&request, segment).await.unwrap(); let recv_packet = receiver.receive().await.unwrap(); - assert_eq!(recv_packet.packet.payload, b"test-echo".to_vec()); + assert_eq!(recv_packet.message.payload, b"test-echo".to_vec()); } //This test right now does not work on windows @@ -872,7 +872,7 @@ pub mod test { let mut receiver = client.create_receiver_for(&request).await; client.send_all_coap(&request, segment).await.unwrap(); let recv_packet = receiver.receive().await.unwrap(); - assert_eq!(recv_packet.packet.payload, b"test-echo".to_vec()); + assert_eq!(recv_packet.message.payload, b"test-echo".to_vec()); } #[test]