Skip to content

Commit

Permalink
feat: add path strategy and async implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jpcsmith authored and mlegner committed Jan 17, 2024
1 parent 4f14b57 commit e719110
Show file tree
Hide file tree
Showing 7 changed files with 1,052 additions and 15 deletions.
15 changes: 15 additions & 0 deletions crates/scion-proto/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::{net::SocketAddr, ops::Deref};

use bytes::Bytes;
use chrono::{DateTime, Utc};
use scion_grpc::daemon::v1 as daemon_grpc;
use tracing::warn;

Expand Down Expand Up @@ -86,6 +87,20 @@ impl<T> Path<T> {
pub fn fingerprint(&self) -> Result<PathFingerprint, FingerprintError> {
PathFingerprint::try_from(self)
}

/// Returns the length of the path in terms of the number of interfaces, if available.
pub fn len(&self) -> Option<usize> {
if self.is_empty() {
Some(0)
} else {
self.metadata.as_ref().map(|m| m.interfaces.len())
}
}

/// Returns the expiry time of the path if the path contains metadata, otherwise None.
pub fn expiry_time(&self) -> Option<DateTime<Utc>> {
self.metadata.as_ref().map(|metadata| metadata.expiration)
}
}

#[allow(missing_docs)]
Expand Down
3 changes: 3 additions & 0 deletions crates/scion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ publish = false
async-trait = "0.1.74"
bytes = "1.5.0"
chrono = { workspace = true, features = ["clock"] }
futures = "0.3.30"
scion-grpc = { version = "0.1.0", path = "../scion-grpc" }
scion-proto = { version = "0.1.0", path = "../scion-proto" }
thiserror = { workspace = true }
Expand All @@ -17,6 +18,8 @@ tonic = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
mockall = "0.12.1"
tokio = { version = "1.34.0", features = ["rt-multi-thread", "macros", "test-util"] }
tracing-subscriber = "0.3.17"

[lints]
Expand Down
2 changes: 2 additions & 0 deletions crates/scion/src/pan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub use path_service::AsyncPathService;

mod error;
pub use error::{PathErrorKind, ReceiveError, SendError};

pub mod path_strategy;
6 changes: 3 additions & 3 deletions crates/scion/src/pan/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,21 @@ where
destination: <Self as AsyncScionDatagram>::Addr,
) -> Result<(), SendError> {
let path = self.path_to(*destination.as_ref()).await?;
self.send_to_via(payload, destination, path).await
self.send_to_via(payload, destination, &path).await
}

/// Send a datagram using [`AsyncScionDatagram::send_via`] with a path from the path service.
pub async fn send(&self, payload: Bytes) -> Result<(), SendError> {
if let Some(remote_addr) = self.remote_addr() {
// Use send_via here as it maintains the connected semantics of the function call.
let path = self.path_to(*remote_addr.as_ref()).await?;
self.send_via(payload, path).await
self.send_via(payload, &path).await
} else {
Err(SendError::Io(io::ErrorKind::NotConnected.into()))
}
}

async fn path_to(&self, remote_ia: IsdAsn) -> Result<&Path, SendError> {
async fn path_to(&self, remote_ia: IsdAsn) -> Result<Path, SendError> {
self.path_service
.path_to(remote_ia)
.await
Expand Down
83 changes: 71 additions & 12 deletions crates/scion/src/pan/path_service.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,84 @@
use std::future::Future;

use bytes::Bytes;
use chrono::Utc;
use scion_proto::{address::IsdAsn, path::Path};

/// Errors returned on a failed path lookup.
#[derive(Debug, thiserror::Error)]
pub enum PathLookupError {
/// Wildcard destinations cannot be queried for paths.
#[error("cannot query paths to wildcard destinations")]
WildcardDestination,
/// Path queries for the provided destination are not supported by this [`AsyncPathService`].
#[error("unsupported destination")]
UnsupportedDestination,
/// The destination can be queried, but there are no paths available to it.
#[error("no path available to destination")]
NoPath,
}

/// Trait for asynchronously retrieving paths to SCION ASes.
#[async_trait::async_trait]
pub trait AsyncPathService {
/// Return a path to the specified AS.
async fn path_to<'a>(&'a self, scion_as: IsdAsn) -> Result<&'a Path, PathLookupError>;
}
/// Associated iterator over returned paths.
type PathsTo: Iterator<Item = Path> + Send;

#[async_trait::async_trait]
impl AsyncPathService for Path<Bytes> {
/// Return a path to the specified AS.
async fn path_to(&self, scion_as: IsdAsn) -> Result<&Path, PathLookupError> {
if self.isd_asn.destination != scion_as {
/// Returns a *non-empty* iterator over paths to the specified SCION AS.
///
/// The order of the returned paths is arbitrary unless otherwise
/// specified by the implementation.
///
/// Returns an error if a wildcard AS is requested, if the destination is not supported
/// by this `AsyncPathService`, or if there are no paths available to the destination.
fn paths_to(
&self,
scion_as: IsdAsn,
) -> impl Future<Output = Result<Self::PathsTo, PathLookupError>> + Send;

/// Return the preferred path to the specified AS.
///
/// Returns an error if a wildcard AS is requested, if the destination is not supported
/// by this `AsyncPathService`, or if there are no paths available to the destination.
fn path_to(
&self,
scion_as: IsdAsn,
) -> impl Future<Output = Result<Path, PathLookupError>> + Send;

/// Returns true if the specified destination is supported by this `AsyncPathService`,
/// false otherwise.
///
/// A supported destination is one to which paths may be successfully queried, wildcard ASes are
/// therefore not supported.
fn is_supported_destination(&self, scion_as: IsdAsn) -> bool {
!scion_as.is_wildcard()
}

/// Returns an error if the destination is a wildcard SCION AS or is not otherwise supported by this
/// `AsyncPathService`, as determined by [`is_supported_destination`][Self::is_supported_destination].
fn check_destination(&self, scion_as: IsdAsn) -> Result<(), PathLookupError> {
if scion_as.is_wildcard() {
return Err(PathLookupError::WildcardDestination);
}
if !self.is_supported_destination(scion_as) {
return Err(PathLookupError::NoPath);
}
if let Some(metadata) = self.metadata.as_ref() {
if metadata.expiration < Utc::now() {

Ok(())
}
}

impl AsyncPathService for Path<Bytes> {
type PathsTo = std::iter::Once<Path>;

async fn paths_to(&self, scion_as: IsdAsn) -> Result<Self::PathsTo, PathLookupError> {
Ok(std::iter::once(self.path_to(scion_as).await?))
}

async fn path_to(&self, scion_as: IsdAsn) -> Result<Path, PathLookupError> {
self.check_destination(scion_as)?;

if let Some(expiry_time) = self.expiry_time() {
if expiry_time <= Utc::now() {
tracing::warn!(
destination=%scion_as,
path=?self,
Expand All @@ -32,6 +87,10 @@ impl AsyncPathService for Path<Bytes> {
return Err(PathLookupError::NoPath);
}
}
Ok(self)
Ok(self.clone())
}

fn is_supported_destination(&self, scion_as: IsdAsn) -> bool {
self.isd_asn.destination == scion_as
}
}
107 changes: 107 additions & 0 deletions crates/scion/src/pan/path_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//! Strategies for caching and selecting paths.
use std::time::{Duration, Instant};

use scion_proto::{address::IsdAsn, path::Path};

mod async_strategy;
pub use async_strategy::AsyncPathStrategy;

/// Errors returned when fetching paths from a [`PathStrategy`].
#[derive(Debug, thiserror::Error)]
pub enum PathFetchError {
/// The requested destination is not supported by this strategy,
/// and will never return a valid result.
#[error("the requested destination is not supported by this strategy")]
UnsupportedDestination,
}

/// Requests that a path strategy can make on its controller.
pub enum Request {
/// Requests that the controller queries paths to the specified destination.
LookupPathsTo(IsdAsn),
/// Requests that the controller calls back the strategy after the specified duration.
Callback(Duration),
}

/// Trait for objects defining the strategy for querying and caching paths.
///
/// An implementation of a `PathStrategy` serves three functions, it
///
/// - defines a state machine that determines when and for which ASes path queries
/// should be made;
/// - filters and caches the returned paths; and
/// - determines, which paths to provide to clients based on specific and possibly
/// configurable metrics.
///
/// The resulting state machine may be expected to be used as follows. After initialization,
/// an initial call to [`poll_request`][Self::poll_requests] is made to determine the initial
/// request. While `poll_requests` returns a request for a path query ([`Request::LookupPathsTo`]),
/// it must be called repeatedly until it returns a request for a callback at a later time
/// ([`Request::Callback`]).
///
/// While waiting for its callback, one or more paths may arrive and be processed with
/// [`handle_lookup_paths`][Self::handle_lookup_paths]; after which, a call to `poll_requests`
/// must be immediately made as handling the incoming paths may cause new requests to be
/// generated. If no paths arrive before the callback duration, `poll_requests` is directly
/// called.
///
/// This flow is depicted in the following diagram:
///
/// ```text
/// LookupPathsTo(_)
/// ┌───┐
/// ┌──────┐ ┌──▼───┴────────────┐Callback(_) ┌─────────┐
/// │*Init*├──►│ poll_requests(..) ├────────────►│*Waiting*│
/// └──────┘ └─────────▲──────▲──┘ *timeout*└─┬───┬───┘
/// │ └──────────────────┘ │
/// │ │
/// ┌─────┐ │ *lookup completes*│
/// *more paths*│ ┌─▼─┴─────────────────────┐ │
/// └───┤ handle_lookup_paths(..) │◄──────┘
/// └─────────────────────────┘
/// ```
///
/// See also [`AsyncPathStrategy`] which wraps the a provided `PathStrategy` and asynchronously
/// handles its requests for callbacks and path queries.
pub trait PathStrategy {
/// Iterator over paths cached by the strategy.
type PathsTo<'p>: Iterator<Item = &'p Path>
where
Self: 'p;

/// Returns an iterator over paths cached by the path strategy.
///
/// The order of the paths is implementation dependent.
fn paths_to(
&self,
destination: IsdAsn,
now: Instant,
) -> Result<Self::PathsTo<'_>, PathFetchError>;

/// Returns a path from the local cache, as chosen by the strategy.
fn path_to(&self, destination: IsdAsn, now: Instant) -> Result<Option<&Path>, PathFetchError>;

/// Returns true if there are paths available to the destination at the provided point in time.
///
/// Subsequent call to [`path_to`][Self::path_to] or [`paths_to`][Self::paths_to] for the given
/// value of `now` are guaranteed to return at least one path.
///
/// Errs if the destination ISD AS is not supported by this service.
fn is_path_available(&self, destination: IsdAsn, now: Instant) -> Result<bool, PathFetchError>;

/// Polls the `PathStrategy` for new [`Request`]s.
///
/// For a given `now` instant, repeated calls to this method should return requests that it
/// expects to be handled before the next callback, followed by a [`Request::Callback`] which
/// indicates the end of the stream of requests for the given `now` instant.
///
/// In addition to after the specified callback, this method must be called after the creation
/// of the strategy and after [`handle_lookup_paths`][Self::handle_lookup_paths] is called.
fn poll_requests(&mut self, now: Instant) -> Request;

/// Filter and store the provided paths.
///
/// The provided paths should correspond to an earlier request, but implementations
/// should be prepared to filter and discard paths to an unexpected destination.
fn handle_lookup_paths(&mut self, paths: &[Path], now: Instant);
}
Loading

0 comments on commit e719110

Please sign in to comment.