diff --git a/src/client.rs b/src/client.rs index 7f4d1271d..23d61a953 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,7 +6,7 @@ use alloc::vec::Vec; use coap_lite::{ block_handler::{extending_splice, BlockValue}, error::HandlingError, - CoapOption, CoapRequest, CoapResponse, MessageClass, MessageType, ObserveOption, Packet, + CoapOption, CoapRequest, CoapResponse, MessageClass, MessageType, ObserveOption, Packet as Message, RequestType as Method, ResponseType as Status, }; use core::mem; @@ -37,6 +37,12 @@ use tokio::{ use url::Url; const DEFAULT_RECEIVE_TIMEOUT_SECONDS: u64 = 2; // 2s +#[derive(Debug, Clone)] +pub struct Packet { + pub address: Option, + pub message: Message, +} + #[derive(Debug)] pub enum ObserveMessage { Terminate, @@ -49,7 +55,7 @@ use async_trait::async_trait; /// timeouts and retries do not need to be implemented by the transport /// if confirmable messages are sent pub trait ClientTransport: Send + Sync { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result; + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)>; async fn send(&self, buf: &[u8]) -> std::io::Result; } @@ -60,9 +66,11 @@ trait TransportExt { impl TransportExt for T { async fn receive_packet(&self) -> IoResult> { let mut buf = [0; 1500]; - let nread = self.recv(&mut buf).await?; - let parse_opt = Packet::from_bytes(&buf[..nread]).ok(); - return Ok(parse_opt); + let (nread, address) = self.recv(&mut buf).await?; + return match Message::from_bytes(&buf[..nread]).ok() { + Some(message) => Ok(Some(Packet {address, message})), + None => Ok(None), + } } } @@ -163,16 +171,16 @@ async fn receive_loop( transport_instance.send(&ack).await?; } - let MessageClass::Response(_) = packet.header.code else { + let MessageClass::Response(_) = packet.message.header.code else { continue; }; - let token = packet.get_token(); + let token = packet.message.get_token(); let Some(sender) = transport_sync.get_sender(token).await else { info!("received unexpected response for token {:?}", &token); continue; }; - match packet.header.code { + match packet.message.header.code { MessageClass::Response(_) => {} m => { debug!("unknown message type {}", m); @@ -191,16 +199,16 @@ async fn receive_loop( } pub fn parse_for_ack(packet: &Packet) -> Option> { - match (packet.header.get_type(), packet.header.code) { + match (packet.message.header.get_type(), packet.message.header.code) { (MessageType::Confirmable, MessageClass::Response(_)) => Some(make_ack(packet)), _ => None, } } pub fn make_ack(packet: &Packet) -> Vec { - let mut ack = Packet::new(); + let mut ack = Message::new(); ack.header.set_type(MessageType::Acknowledgement); - ack.header.message_id = packet.header.message_id; + ack.header.message_id = packet.message.header.message_id; ack.header.code = MessageClass::Empty; return ack.to_bytes().unwrap(); } @@ -226,9 +234,9 @@ impl Clone for CoapClientTransport { impl CoapClientTransport { pub const DEFAULT_NUM_RETRIES: usize = 5; - async fn establish_receiver_for(&self, msg: &Packet) -> UnboundedReceiver> { + async fn establish_receiver_for(&self, packet: &Packet) -> UnboundedReceiver> { let (tx, rx) = unbounded_channel(); - let token = msg.get_token().to_owned(); + let token = packet.message.get_token().to_owned(); self.synchronizer.set_sender(token, tx).await; return rx; } @@ -249,8 +257,8 @@ impl CoapClientTransport { return res; } - fn encode_packet(packet: &Packet) -> IoResult> { - packet + fn encode_message(message: &Message) -> IoResult> { + message .to_bytes() .map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e.to_string())) } @@ -260,7 +268,7 @@ impl CoapClientTransport { msg: &Packet, receiver: &mut UnboundedReceiver>, ) -> IoResult { - let bytes = Self::encode_packet(msg)?; + let bytes = Self::encode_message(&msg.message)?; self.transport.send(&bytes).await?; let try_receive: Result>, tokio::time::error::Elapsed> = timeout(self.timeout, receiver.recv()).await; @@ -275,7 +283,7 @@ impl CoapClientTransport { packet: &Packet, receiver: &mut UnboundedReceiver>, ) -> IoResult { - if packet.header.get_type() == MessageType::Confirmable { + if packet.message.header.get_type() == MessageType::Confirmable { return self.try_send_confirmable_message(&packet, receiver).await; } else { return self @@ -289,7 +297,7 @@ impl CoapClientTransport { let result = self .do_request_response_for_packet_inner(packet, &mut receiver) .await; - self.synchronizer.remove_sender(packet.get_token()).await; + self.synchronizer.remove_sender(packet.message.get_token()).await; result } @@ -309,11 +317,11 @@ pub struct UdpTransport { } #[async_trait] impl ClientTransport for UdpTransport { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result { - self.socket + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)> { + let (read, addr) = self.socket .recv_from(buf) - .await - .map(|(recv_size, _addr)| recv_size) + .await?; + return Ok((read, Some(addr))); } async fn send(&self, buf: &[u8]) -> std::io::Result { self.socket.send_to(buf, self.peer_addr).await @@ -481,7 +489,7 @@ impl UdpCoAPClient { /// client.send_all_coap(&request, segment).await.unwrap(); /// loop { /// let recv_packet = receiver.receive().await.unwrap(); - /// assert_eq!(recv_packet.payload, b"test-echo".to_vec()); + /// assert_eq!(recv_packet.message.payload, b"test-echo".to_vec()); /// } /// } /// ``` @@ -591,7 +599,7 @@ impl CoAPClient { } /// Execute a single request (GET, POST, PUT, DELETE) with a coap url and a specfic timeout - /// using udp + /// using udp pub async fn request_with_timeout( url: &str, method: Method, @@ -619,7 +627,7 @@ impl CoAPClient { self.receive(&mut request).await } - pub async fn observe( + pub async fn observe( &self, resource_path: &str, handler: H, @@ -634,7 +642,7 @@ impl CoAPClient { /// Observe a resource with the handler and specified timeout using the given transport. /// Use the oneshot sender to cancel observation. If this sender is dropped without explicitly /// cancelling it, the observation will continue forever. - pub async fn observe_with_timeout( + pub async fn observe_with_timeout( &mut self, resource_path: &str, handler: H, @@ -651,7 +659,7 @@ impl CoAPClient { /// Use this method if you need to set some specific options in your /// requests. This method will add observe flags and a message id as a fallback /// Use this method if you plan on re-using the same client for requests - pub async fn observe_with( + pub async fn observe_with( &self, request: CoapRequest, mut handler: H, @@ -723,17 +731,17 @@ impl CoAPClient { let _ = self .transport - .do_request_response_for_packet(&deregister_packet.message) + .do_request_response_for_packet(&Packet {address:None, message: deregister_packet.message}) .await; } - async fn receive_and_handle_message_observe( + async fn receive_and_handle_message_observe( socket_result: IoResult, handler: &mut H, ) { match socket_result { Ok(packet) => { - handler(packet); + handler(packet.message); } Err(e) => match e.kind() { ErrorKind::WouldBlock => { @@ -755,9 +763,9 @@ impl CoAPClient { ) -> IoResult { let response = self .transport - .do_request_response_for_packet(&request.message) + .do_request_response_for_packet(&Packet {address:None, message:request.message.to_owned()}) .await?; - Ok(CoapResponse { message: response }) + Ok(CoapResponse { message: response.message }) } /// low-level method to send a a request supporting block1 option based on @@ -1214,7 +1222,7 @@ mod test { #[async_trait] impl ClientTransport for FaultyUdp { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result { + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)> { self.udp.recv(buf).await } @@ -1381,7 +1389,7 @@ mod test { } #[async_trait] impl ClientTransport for FaultyReceiver { - async fn recv(&self, buf: &mut [u8]) -> std::io::Result { + async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option)> { let mut mutex = self.should_fail.lock().await; tokio::select! { e = mutex.deref_mut() => { diff --git a/src/dtls.rs b/src/dtls.rs index aee905d07..43282f0fe 100644 --- a/src/dtls.rs +++ b/src/dtls.rs @@ -42,13 +42,13 @@ pub struct DtlsResponse { #[async_trait] impl ClientTransport for DtlsConnection { - async fn recv(&self, buf: &mut [u8]) -> IoResult { + async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, Option)> { let read = self .conn .read(buf, None) .await .map_err(|e| Error::new(ErrorKind::Other, e))?; - return Ok(read); + return Ok((read, self.conn.remote_addr())); } async fn send(&self, buf: &[u8]) -> IoResult { @@ -108,7 +108,7 @@ pub struct DtlsConnection { } impl DtlsConnection { - /// Creates a new DTLS connection from a given connection. This connection can be + /// Creates a new DTLS connection from a given connection. This connection can be /// a tokio UDP socket or a user-created struct implementing Conn, Send, and Sync /// /// @@ -187,7 +187,7 @@ mod test { use rcgen::KeyPair; use std::fs::File; use std::io::{BufReader, Read}; - use std::net::{SocketAddr, ToSocketAddrs}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}; use std::sync::atomic::AtomicBool; use tokio::sync::mpsc; use tokio::time::sleep; @@ -635,7 +635,7 @@ mod test { todo!("not needed"); } fn remote_addr(&self) -> Option { - todo!("not needed") + Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)) } async fn close(&self) -> WebrtcResult<()> { Ok(self.0.close().await?) diff --git a/src/server.rs b/src/server.rs index 4689cbfe9..38ad2fd34 100644 --- a/src/server.rs +++ b/src/server.rs @@ -817,7 +817,7 @@ pub mod test { let mut receiver = client.create_receiver_for(&request).await; client.send_all_coap(&request, segment).await.unwrap(); let recv_packet = receiver.receive().await.unwrap(); - assert_eq!(recv_packet.payload, b"test-echo".to_vec()); + assert_eq!(recv_packet.message.payload, b"test-echo".to_vec()); } //This test right now does not work on windows @@ -872,7 +872,7 @@ pub mod test { let mut receiver = client.create_receiver_for(&request).await; client.send_all_coap(&request, segment).await.unwrap(); let recv_packet = receiver.receive().await.unwrap(); - assert_eq!(recv_packet.payload, b"test-echo".to_vec()); + assert_eq!(recv_packet.message.payload, b"test-echo".to_vec()); } #[test]