Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RACK variation for QUIC #1486

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions neqo-transport/src/cc/classic_cc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
}

// Multi-packet version of OnPacketAckedCC
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool {
// Check whether we are app limited before acked packets are removed
// from bytes_in_flight.
let is_app_limited = self.app_limited();
Expand All @@ -166,6 +171,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
MAX_DATAGRAM_SIZE * PACING_BURST_SIZE,
);

let mut exiting_recovery = false;
let mut new_acked = 0;
for pkt in acked_pkts {
qinfo!(
Expand All @@ -190,6 +196,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {

if self.state.in_recovery() {
self.set_state(State::CongestionAvoidance);
exiting_recovery = true;
qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]);
}

Expand All @@ -199,7 +206,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if is_app_limited {
self.cc_algorithm.on_app_limited();
qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
return;
return exiting_recovery;
}

// Slow start, up to the slow start threshold.
Expand Down Expand Up @@ -250,6 +257,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
],
);
qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
exiting_recovery
}

/// Update congestion controller state based on lost packets.
Expand Down
7 changes: 6 additions & 1 deletion neqo-transport/src/cc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ pub trait CongestionControl: Display + Debug {
#[must_use]
fn cwnd_avail(&self) -> usize;

fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant);
fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool;

/// Returns true if the congestion window was reduced.
fn on_packets_lost(
Expand Down
4 changes: 2 additions & 2 deletions neqo-transport/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,10 @@ impl Path {
}

/// Record packets as acknowledged with the sender.
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) {
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) -> bool {
debug_assert!(self.is_primary());
self.sender
.on_packets_acked(acked_pkts, self.rtt.minimum(), now);
.on_packets_acked(acked_pkts, self.rtt.minimum(), now)
}

/// Record packets as lost with the sender.
Expand Down
85 changes: 78 additions & 7 deletions neqo-transport/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ pub(crate) struct LossRecoverySpace {
/// This is `None` if there were no out-of-order packets detected.
/// When set to `Some(T)`, time-based loss detection should be enabled.
first_ooo_time: Option<Instant>,
/// If no reordering has been observed, TODO: just say reo_wnd_mult != 0
reordering_seen: bool,
/// the RTO is RTT * (reo_wnd_mult + 9) / 8
///
/// this is basically the index of the first non-zero entry of `reo_wnd_persist`
reorder_window_mult: u32,
reorder_window_persist: u32,
}

impl LossRecoverySpace {
Expand All @@ -197,6 +204,9 @@ impl LossRecoverySpace {
in_flight_outstanding: 0,
sent_packets: BTreeMap::default(),
first_ooo_time: None,
reorder_window_mult: 0,
reorder_window_persist: 0,
reordering_seen: false,
}
}

Expand Down Expand Up @@ -384,18 +394,20 @@ impl LossRecoverySpace {
pub fn detect_lost_packets(
&mut self,
now: Instant,
loss_delay: Duration,
rtt_estimate: Duration,
cleanup_delay: Duration,
lost_packets: &mut Vec<SentPacket>,
) {
// Housekeeping.
self.remove_old_lost(now, cleanup_delay);

let loss_delay = rtt_estimate * (self.reorder_window_mult + 9) / 8;
qtrace!(
"detect lost {}: now={:?} delay={:?}",
"detect lost {}: now={:?} delay={:?}, multiplier={}",
self.space,
now,
loss_delay,
self.reorder_window_mult
);
self.first_ooo_time = None;

Expand All @@ -418,7 +430,7 @@ impl LossRecoverySpace {
packet.time_sent,
loss_delay
);
} else if largest_acked >= Some(*pn + PACKET_THRESHOLD) {
} else if !self.reordering_seen && largest_acked >= Some(*pn + PACKET_THRESHOLD) {
qtrace!(
"lost={}, is >= {} from largest acked {:?}",
pn,
Expand All @@ -438,6 +450,60 @@ impl LossRecoverySpace {

lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone()));
}

pub fn detect_reordered_packets(
&mut self,
now: Instant,
acked_pkts: &[SentPacket],
rtt_estimate: Duration,
) {
// detect packet reordering
let mut max_rtt = Duration::default();
if let Some(largest_ack) = self.largest_acked {
for pkt in acked_pkts
.iter()
.filter(|pkt| pkt.cc_in_flight() && pkt.pn < largest_ack)
{
qinfo!("detect_reordered_packets largest_ack={}, pn={}", largest_ack, pkt.pn);
// reordering event
self.reordering_seen = true;
max_rtt = max(max_rtt, now.duration_since(pkt.time_sent));
}
}
// update reo_wnd
if max_rtt > rtt_estimate && !rtt_estimate.is_zero() {
// calculate reo_wnd necessary to accept the reordering event
// inverse of
// lost_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8;
// <=> self.reo_wnd_mult = (lost_delay / rtt_estimate) * 8 - 9
let multiplier = min(
(max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1,
8,
);
let multiplier = u32::try_from(multiplier).unwrap();
qinfo!(
"detect_reordered_packets max_rtt={}, rtt_estimate={} old_barrier={}, new_barrier={}",
max_rtt.as_micros(),
rtt_estimate.as_micros(),
(rtt_estimate * (self.reorder_window_mult + 9) / 8).as_micros(),
(rtt_estimate * (multiplier + 9) / 8).as_micros()
);
self.reorder_window_mult = max(self.reorder_window_mult, multiplier);
}
}

pub fn on_exiting_recovery(&mut self) {
if self.reorder_window_persist != 0 {
self.reorder_window_persist -= 1;
if self.reorder_window_persist == 0 {
self.reorder_window_mult = 0;
}
}
qinfo!(
"on_exiting_recovery reorder_window_persist={}, reorder_window_mult={}",
self.reorder_window_persist, self.reorder_window_mult
);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -680,6 +746,9 @@ impl LossRecovery {
return (Vec::new(), Vec::new());
}

let rtt_estimate = primary_path.borrow().rtt().estimated_upper();
space.detect_reordered_packets(now, &acked_packets, rtt_estimate);

// Track largest PN acked per space
let prev_largest_acked = space.largest_acked_sent_time;
if Some(largest_acked) > space.largest_acked {
Expand All @@ -704,12 +773,11 @@ impl LossRecovery {
// We need to ensure that we have sent any PTO probes before they are removed
// as we rely on the count of in-flight packets to determine whether to send
// another probe. Removing them too soon would result in not sending on PTO.
let loss_delay = primary_path.borrow().rtt().loss_delay();
let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space);
let mut lost = Vec::new();
self.spaces.get_mut(pn_space).unwrap().detect_lost_packets(
now,
loss_delay,
rtt_estimate,
cleanup_delay,
&mut lost,
);
Expand All @@ -725,9 +793,12 @@ impl LossRecovery {
// This must happen after on_packets_lost. If in recovery, this could
// take us out, and then lost packets will start a new recovery period
// when it shouldn't.
primary_path
if primary_path
.borrow_mut()
.on_packets_acked(&acked_packets, now);
.on_packets_acked(&acked_packets, now)
{
self.spaces.get_mut(pn_space).unwrap().on_exiting_recovery();
}

self.pto_state = None;

Expand Down
10 changes: 10 additions & 0 deletions neqo-transport/src/rtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ impl RttEstimate {
max(rtt * 9 / 8, GRANULARITY)
}

/// Frin RFC9002 Section 6.1.2 Time Treshhold
/// Using `max(smoothed_rtt, latest_rtt)` protects from the two following cases:
// * the latest RTT sample is lower than the smoothed RTT, perhaps due to reordering where the
// acknowledgment encountered a shorter path;
// * the latest RTT sample is higher than the smoothed RTT, perhaps due to a sustained
// increase in the actual RTT, but the smoothed RTT has not yet caught up.
pub fn estimated_upper(&self) -> Duration {
max(self.latest_rtt, self.smoothed_rtt)
}

pub fn first_sample_time(&self) -> Option<Instant> {
self.first_sample_time
}
Expand Down
9 changes: 7 additions & 2 deletions neqo-transport/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ impl PacketSender {
self.cc.cwnd_avail()
}

pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
self.cc.on_packets_acked(acked_pkts, min_rtt, now);
pub fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool {
self.cc.on_packets_acked(acked_pkts, min_rtt, now)
}

/// Called when packets are lost. Returns true if the congestion window was reduced.
Expand Down