Skip to content

Commit

Permalink
Add jitter
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Feb 11, 2025
1 parent 6d75481 commit 6359b27
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/net/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ pub fn spawn(workers: XdpWorkers, config: Arc<crate::Config>) -> Result<XdpLoop,
const BATCH_SIZE: usize = 64;
use xdp::packet::net_types::NetworkU16;

use crate::time::UtcTimestamp;

/// The core I/O loop
///
/// All of the ring operations are done in this loop so that the actual
Expand Down Expand Up @@ -393,6 +395,7 @@ fn io_loop(
sessions,
local_ipv4,
local_ipv6,
last_receive: UtcTimestamp::now(),
};

let mut rx_slab = xdp::HeapSlab::with_capacity(BATCH_SIZE);
Expand Down
17 changes: 16 additions & 1 deletion src/net/xdp/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
maxmind_db::{self, IpNetEntry},
EndpointAddress,
},
time::UtcTimestamp,
};
pub use quilkin_xdp::xdp;
use quilkin_xdp::xdp::{
Expand Down Expand Up @@ -114,6 +115,7 @@ pub struct State {
pub sessions: Arc<SessionState>,
pub local_ipv4: std::net::Ipv4Addr,
pub local_ipv6: std::net::Ipv6Addr,
pub last_receive: UtcTimestamp,
}

impl State {
Expand Down Expand Up @@ -440,6 +442,11 @@ pub fn process_packets(
let filters = state.config.filters.load();
let cm = state.config.clusters.clone_value();

let now = UtcTimestamp::now();
let jitter = (now - state.last_receive).nanos();
state.last_receive = now;
let mut had_read = false;

while let Some(inner) = rx_slab.pop_front() {
let Ok(Some(udp)) = UdpPacket::parse_packet(&inner) else {
unreachable!("we somehow got a non-UDP packet, this should be impossible with the eBPF program we use to route packets");
Expand All @@ -452,6 +459,7 @@ pub fn process_packets(

let is_client = udp.dst_port == state.external_port;
let direction = if is_client {
had_read = true;
metrics::READ
} else {
metrics::WRITE
Expand All @@ -465,7 +473,7 @@ pub fn process_packets(
if is_client {
process_client_packet(packet, umem, &filters, &cm, state, tx_slab)
} else {
process_server_packet(packet, umem, &filters, state, tx_slab)
process_server_packet(packet, umem, &filters, state, tx_slab, jitter)
}
};

Expand All @@ -483,6 +491,10 @@ pub fn process_packets(
}
}
}

if had_read {
metrics::packet_jitter(metrics::READ, &metrics::EMPTY).set(jitter);
}
}

#[inline]
Expand Down Expand Up @@ -630,6 +642,7 @@ fn process_server_packet(
filters: &crate::filters::FilterChain,
state: &mut State,
tx_slab: &mut HeapSlab,
jitter: i64,
) -> Result<Option<Packet>, (PipelineError, Packet)> {
let server_addr = SocketAddr::new(packet.udp.ips.source(), packet.udp.src_port.host());

Expand All @@ -638,6 +651,8 @@ fn process_server_packet(
return Ok(Some(packet.inner));
};

metrics::packet_jitter(metrics::Direction::Write, &asn).set(jitter);

let mut ctx = filters::WriteContext::new(server_addr.into(), client_addr.into(), packet);

let mut packet = match filters.write(&mut ctx) {
Expand Down

0 comments on commit 6359b27

Please sign in to comment.