Skip to content

Commit

Permalink
Merge branch 'dev' into tninesling/demand-control-opt
Browse files Browse the repository at this point in the history
  • Loading branch information
tninesling authored Jan 6, 2025
2 parents 3b51000 + ef6ae6c commit 8973127
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"

[[package]]
name = "async-trait"
version = "0.1.83"
version = "0.1.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd"
checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0"
dependencies = [
"proc-macro2",
"quote",
Expand Down Expand Up @@ -1863,9 +1863,9 @@ dependencies = [

[[package]]
name = "crossbeam-channel"
version = "0.5.13"
version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
dependencies = [
"crossbeam-utils",
]
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ clap = { version = "4.5.8", default-features = false, features = [
"help",
] }
cookie = { version = "0.18.0", default-features = false }
crossbeam-channel = "0.5"
crossbeam-channel = "0.5.14"
ci_info = { version = "0.14.14", features = ["serde-1"] }
dashmap = { version = "5.5.3", features = ["serde"] }
derivative = "2.2.0"
Expand Down
26 changes: 11 additions & 15 deletions apollo-router/src/ageing_priority_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
}

pub(crate) fn receiver(&self) -> Receiver<'_, T> {
let mut select = crossbeam_channel::Select::new();
let mut select = crossbeam_channel::Select::new_biased();
for (_, inner_receiver) in &self.inner_queues {
select.recv(inner_receiver);
}
Expand All @@ -90,20 +90,16 @@ where
T: Send + 'static,
{
pub(crate) fn blocking_recv(&mut self) -> T {
loop {
// Block until something is ready.
// Ignore the returned index because it is "random" when multiple operations are ready.
self.select.ready();
// Check inner channels in priority order instead:
for (index, (_, inner_receiver)) in self.shared.inner_queues.iter().enumerate() {
if let Ok(message) = inner_receiver.try_recv() {
self.shared.queued_count.fetch_sub(1, Ordering::Relaxed);
self.age(index);
return message;
}
}
// Another thread raced us to it or `ready()` returned spuriously, try again
}
// Because we used `Select::new_biased` above,
// `select()` will not shuffle receivers as it would with `Select::new` (for fairness)
// but instead will try each one in priority order.
let selected = self.select.select();
let index = selected.index();
let (_tx, rx) = &self.shared.inner_queues[index];
let item = selected.recv(rx).expect("disconnected channel");
self.shared.queued_count.fetch_sub(1, Ordering::Relaxed);
self.age(index);
item
}

// Promote some messages from priorities lower (higher indices) than `message_consumed_at_index`
Expand Down

0 comments on commit 8973127

Please sign in to comment.