Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharding via inject duplication #7184

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub struct Builder {
/// Only used when not using the current-thread executor.
worker_threads: Option<usize>,

/// TODO(i.erin)
worker_group: usize,

/// Cap on thread usage.
max_blocking_threads: usize,

Expand Down Expand Up @@ -297,6 +300,8 @@ impl Builder {
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,

worker_group: 1,

max_blocking_threads: 512,

// Default thread name
Expand Down Expand Up @@ -428,6 +433,13 @@ impl Builder {
self
}

/// TODO(i.erin)
pub fn worker_group(&mut self, val: usize) -> &mut Self {
assert!(val > 0, "Worker groups cannot be set to 0");
self.worker_group = val;
self
}

/// Specifies the limit for additional threads spawned by the Runtime.
///
/// These threads are used for blocking operations like tasks spawned
Expand Down Expand Up @@ -1641,8 +1653,9 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let core_groups = self.worker_group;

let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads * core_groups))?;

// Create the blocking pool
let blocking_pool =
Expand All @@ -1655,6 +1668,7 @@ cfg_rt_multi_thread! {

let (scheduler, handle, launch) = MultiThread::new(
core_threads,
core_groups,
driver,
driver_handle,
blocking_spawner,
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ impl RuntimeMetrics {
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.global_queue_depth();
/// // TODO(i.erin)
/// let n = metrics.global_queue_depth(0);
/// println!("{} tasks currently pending in the runtime's global queue", n);
/// }
/// ```
pub fn global_queue_depth(&self) -> usize {
self.handle.inner.injection_queue_depth()
pub fn global_queue_depth(&self, group: usize) -> usize {
self.handle.inner.injection_queue_depth(group)
}

cfg_unstable_metrics! {
Expand Down Expand Up @@ -684,8 +684,8 @@ impl RuntimeMetrics {
/// Renamed to [`RuntimeMetrics::global_queue_depth`]
#[deprecated = "Renamed to global_queue_depth"]
#[doc(hidden)]
pub fn injection_queue_depth(&self) -> usize {
self.handle.inner.injection_queue_depth()
pub fn injection_queue_depth(&self, group: usize) -> usize {
self.handle.inner.injection_queue_depth(group)
}

/// Returns the number of tasks currently scheduled in the given worker's
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ impl Handle {
self.shared.owned.num_alive_tasks()
}

pub(crate) fn injection_queue_depth(&self) -> usize {
pub(crate) fn injection_queue_depth(&self, _group: usize) -> usize {
self.shared.inject.len()
}
}
Expand Down
23 changes: 22 additions & 1 deletion tokio/src/runtime/scheduler/inject.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Inject queue used to send wakeups to a work-stealing scheduler


use crate::loom::sync::Mutex;
use crate::runtime::task;

Expand Down Expand Up @@ -36,7 +37,7 @@ impl<T: 'static> Inject<T> {
}

// Kind of annoying to have to include the cfg here
#[cfg(tokio_taskdump)]
#[cfg(any(tokio_taskdump, feature = "rt-multi-thread"))]
pub(crate) fn is_closed(&self) -> bool {
let synced = self.synced.lock();
self.shared.is_closed(&synced)
Expand Down Expand Up @@ -67,4 +68,24 @@ impl<T: 'static> Inject<T> {
// safety: passing correct `Synced`
unsafe { self.shared.pop(&mut synced) }
}

cfg_rt_multi_thread! {
pub (crate) fn is_empty(&self) -> bool {
self.shared.is_empty()
}

pub (crate) fn synced(&self) -> &Mutex<Synced> {
&self.synced
}

pub (crate) fn shared(&self) -> &Shared<T> {
&self.shared
}

pub (crate) fn push_batch<I>(&self, iter: I)
where I: Iterator<Item = task::Notified<T>> {
let mut sync = self.synced.lock();
unsafe { self.shared.push_batch(sync.as_mut(), iter); }
}
}
}
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
}

pub(crate) fn injection_queue_depth(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
pub(crate) fn injection_queue_depth(&self, group: usize) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth(group))
}
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ impl Handle {
self.shared.owned.num_alive_tasks()
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
pub(crate) fn injection_queue_depth(&self, group: usize) -> usize {
self.shared.injection_queue_depth(group)
}

cfg_unstable_metrics! {
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod handle;
pub(crate) use handle::Handle;

mod overflow;
pub(crate) use overflow::Overflow;

mod idle;
use self::idle::Idle;
Expand Down Expand Up @@ -56,6 +55,7 @@ pub(crate) struct MultiThread;
impl MultiThread {
pub(crate) fn new(
size: usize,
group: usize,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
Expand All @@ -65,6 +65,7 @@ impl MultiThread {
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size,
group,
parker,
driver_handle,
blocking_spawner,
Expand Down
19 changes: 9 additions & 10 deletions tokio/src/runtime/scheduler/multi_thread/overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@ use crate::runtime::task;
#[cfg(test)]
use std::cell::RefCell;

pub(crate) trait Overflow<T: 'static> {
fn push(&self, task: task::Notified<T>);
pub(crate) trait OverflowShard<T: 'static> {
fn push(&self, task: task::Notified<T>, group: usize);

fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<T>>;
fn push_batch<I>(&self, iter: I, group: usize)
where I: Iterator<Item = task::Notified<T>>;
}

#[cfg(test)]
impl<T: 'static> Overflow<T> for RefCell<Vec<task::Notified<T>>> {
fn push(&self, task: task::Notified<T>) {
self.borrow_mut().push(task);
impl<T: 'static> OverflowShard<T> for RefCell<Vec<Vec<task::Notified<T>>>> {
fn push(&self, task: task::Notified<T>, group: usize) {
self.borrow_mut()[group].push(task);
}

fn push_batch<I>(&self, iter: I)
fn push_batch<I>(&self, iter: I, group: usize)
where
I: Iterator<Item = task::Notified<T>>,
{
self.borrow_mut().extend(iter);
self.borrow_mut()[group].extend(iter);
}
}
14 changes: 8 additions & 6 deletions tokio/src/runtime/scheduler/multi_thread/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread::{Overflow, Stats};
use crate::runtime::scheduler::multi_thread::{overflow::OverflowShard, Stats};
use crate::runtime::task;

use std::mem::{self, MaybeUninit};
Expand Down Expand Up @@ -182,9 +182,10 @@ impl<T> Local<T> {
/// When the queue overflows, half of the current contents of the queue is
/// moved to the given Injection queue. This frees up capacity for more
/// tasks to be pushed into the local queue.
pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
pub(crate) fn push_back_or_overflow<O: OverflowShard<T>>(
&mut self,
mut task: task::Notified<T>,
group: usize,
overflow: &O,
stats: &mut Stats,
) {
Expand All @@ -201,12 +202,12 @@ impl<T> Local<T> {
} else if steal != real {
// Concurrently stealing, this will free up capacity, so only
// push the task onto the inject queue
overflow.push(task);
overflow.push(task, group);
return;
} else {
// Push the current task and half of the queue into the
// inject queue.
match self.push_overflow(task, real, tail, overflow, stats) {
match self.push_overflow(task, group, real, tail, overflow, stats) {
Ok(_) => return,
// Lost the race, try again
Err(v) => {
Expand Down Expand Up @@ -247,9 +248,10 @@ impl<T> Local<T> {
/// workers "missed" some of the tasks during a steal, they will get
/// another opportunity.
#[inline(never)]
fn push_overflow<O: Overflow<T>>(
fn push_overflow<O: OverflowShard<T>>(
&mut self,
task: task::Notified<T>,
group: usize,
head: UnsignedShort,
tail: UnsignedShort,
overflow: &O,
Expand Down Expand Up @@ -333,7 +335,7 @@ impl<T> Local<T> {
head: head as UnsignedLong,
i: 0,
};
overflow.push_batch(batch_iter.chain(std::iter::once(task)));
overflow.push_batch(batch_iter.chain(std::iter::once(task)), group);

// Add 1 to factor in the task currently being scheduled.
stats.incr_overflow_count();
Expand Down
Loading
Loading