diff --git a/crates/core/src/ampc/worker.rs b/crates/core/src/ampc/worker.rs index 5bde34c5..ceb57e4a 100644 --- a/crates/core/src/ampc/worker.rs +++ b/crates/core/src/ampc/worker.rs @@ -98,6 +98,12 @@ where Ok(res) } + fn send_raw_without_timeout(&self, req: &JobReq) -> Result> { + let mut conn = self.conn()?; + let res = block_on(conn.send_without_timeout(req))?; + Ok(res) + } + fn send(&self, req: R) -> R::Response where R: RequestWrapper<::Worker>, @@ -107,6 +113,19 @@ where Resp::User(res) => R::unwrap_response(res).unwrap(), } } + + fn send_without_timeout(&self, req: R) -> R::Response + where + R: RequestWrapper<::Worker>, + { + match self + .send_raw_without_timeout(&Req::User(R::wrap(req))) + .unwrap() + { + Resp::Coordinator(_) => panic!("unexpected coordinator response"), + Resp::User(res) => R::unwrap_response(res).unwrap(), + } + } } pub trait RequestWrapper: Message { diff --git a/crates/core/src/distributed/sonic/mod.rs b/crates/core/src/distributed/sonic/mod.rs index a514cac8..dfef96b8 100644 --- a/crates/core/src/distributed/sonic/mod.rs +++ b/crates/core/src/distributed/sonic/mod.rs @@ -119,7 +119,7 @@ where } } - async fn send_without_timeout(&mut self, request: &Req) -> Result { + pub async fn send_without_timeout(&mut self, request: &Req) -> Result { self.awaiting_res = true; let bytes = bincode::encode_to_vec(request, common::bincode_config()).unwrap(); diff --git a/crates/core/src/entrypoint/ampc/shortest_path/worker.rs b/crates/core/src/entrypoint/ampc/shortest_path/worker.rs index 3fdfc5e8..d280e380 100644 --- a/crates/core/src/entrypoint/ampc/shortest_path/worker.rs +++ b/crates/core/src/entrypoint/ampc/shortest_path/worker.rs @@ -175,7 +175,7 @@ impl RemoteShortestPathWorker { } pub fn sample_nodes(&self, num_nodes: u64) -> Vec { - self.send(SampleNodes(num_nodes)) + self.send_without_timeout(SampleNodes(num_nodes)) } }