Skip to content

Commit

Permalink
waker: remove Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-orlovsky committed Apr 14, 2023
1 parent e03dd91 commit 5a331ea
Showing 1 changed file with 7 additions and 16 deletions.
23 changes: 7 additions & 16 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::fmt::{Debug, Display, Formatter};
use std::io::Write;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
use std::{io, thread};
Expand Down Expand Up @@ -284,7 +284,7 @@ impl<C> Reactor<C> {

let controller = Controller {
ctl_send,
waker: Arc::new(Mutex::new(waker_writer)),
waker: Arc::new(waker_writer),
};

#[cfg(feature = "log")]
Expand Down Expand Up @@ -343,7 +343,7 @@ enum Ctl<C> {
/// [`Handler::Command`] for the details).
pub struct Controller<C> {
ctl_send: chan::Sender<Ctl<C>>,
waker: Arc<Mutex<UnixStream>>,
waker: Arc<UnixStream>,
}

impl<C> Clone for Controller<C> {
Expand Down Expand Up @@ -400,23 +400,14 @@ impl<C> Controller<C> {
#[cfg(feature = "log")]
log::trace!(target: "reactor-controller", "Wakening the reactor");

let mut waker = loop {
match self.waker.lock() {
Err(err) => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker lock is poisoned: {err}");
}
Ok(waker) => break waker,
}
};

loop {
match waker.write_all(&[0x1]) {
let mut waker = self.waker.as_ref();
match (&mut waker).write_all(&[0x1]) {
Ok(_) => return Ok(()),
Err(e) if e.kind() == WouldBlock => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker write queue got overfilled, resetting and repeating...");
reset_fd(&waker.as_raw_fd())?;
reset_fd(&self.waker.as_raw_fd())?;
}
Err(e) if e.kind() == Interrupted => {
#[cfg(feature = "log")]
Expand Down Expand Up @@ -489,7 +480,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {

let controller = Controller {
ctl_send,
waker: Arc::new(Mutex::new(waker_writer)),
waker: Arc::new(waker_writer),
};

Ok(Runtime {
Expand Down

0 comments on commit 5a331ea

Please sign in to comment.