Skip to content

Commit

Permalink
feat(wasmtime-cli): add UDS/WebTransport/QUIC support
Browse files Browse the repository at this point in the history
  • Loading branch information
rvolosatovs committed Nov 12, 2024
1 parent 7b02ba9 commit 6492159
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions crates/wasmtime-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ clap = { workspace = true, features = [
] }
futures = { workspace = true }
humantime = { workspace = true }
quinn = { workspace = true, features = [
"log",
"platform-verifier",
"ring",
"runtime-tokio",
"rustls",
] }
reqwest = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["codec"] }
Expand All @@ -46,7 +53,10 @@ wasmtime = { workspace = true, features = [
] }
wasmtime-wasi = { workspace = true }
wit-component = { workspace = true }
wtransport = { workspace = true, features = ["self-signed"] }
wrpc-cli = { workspace = true, features = ["nats"] }
wrpc-transport-nats = { workspace = true, features = ["async-nats-0_37"] }
wrpc-transport-quic = { workspace = true }
wrpc-transport-web = { workspace = true }
wrpc-transport = { workspace = true, features = ["net"] }
wrpc-runtime-wasmtime = { workspace = true }
15 changes: 15 additions & 0 deletions crates/wasmtime-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ use wrpc_runtime_wasmtime::{
use wrpc_transport::{Invoke, Serve};

mod nats;
mod quic;
mod tcp;
#[cfg(unix)]
mod unix;
mod web;

const DEFAULT_TIMEOUT: &str = "10s";

Expand All @@ -37,7 +41,14 @@ enum Command {
#[command(subcommand)]
Nats(nats::Command),
#[command(subcommand)]
Quic(quic::Command),
#[command(subcommand)]
Tcp(tcp::Command),
#[cfg(unix)]
#[command(subcommand)]
Unix(unix::Command),
#[command(subcommand)]
Web(web::Command),
}

pub enum Workload {
Expand Down Expand Up @@ -637,6 +648,10 @@ pub async fn run() -> anyhow::Result<()> {
wrpc_cli::tracing::init();
match Command::parse() {
Command::Nats(args) => nats::run(args).await,
Command::Quic(args) => quic::run(args).await,
Command::Tcp(args) => tcp::run(args).await,
#[cfg(unix)]
Command::Unix(args) => unix::run(args).await,
Command::Web(args) => web::run(args).await,
}
}
68 changes: 68 additions & 0 deletions crates/wasmtime-cli/src/quic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use core::net::{Ipv6Addr, SocketAddr};

use anyhow::Context as _;
use clap::Parser;
use quinn::{ClientConfig, Endpoint};
use tracing::instrument;

pub const DEFAULT_ADDR: &str = "[::1]:4433";

/// QUIC transport
#[derive(Parser, Debug)]
pub enum Command {
Run(RunArgs),
}

/// Run a command component
#[derive(Parser, Debug)]
pub struct RunArgs {
/// Invocation timeout
#[arg(long, default_value = crate::DEFAULT_TIMEOUT)]
timeout: humantime::Duration,

/// Address to send import invocations to
#[arg(long, default_value = DEFAULT_ADDR)]
import_addr: SocketAddr,

/// Server name to use for import connection
#[arg(long)]
import_san: String,

/// Path or URL to Wasm command component
workload: String,
}

#[instrument(level = "trace", ret(level = "trace"))]
pub async fn handle_run(
RunArgs {
timeout,
import_addr,
import_san,
ref workload,
}: RunArgs,
) -> anyhow::Result<()> {
let mut ep = Endpoint::client((Ipv6Addr::UNSPECIFIED, 0).into())
.context("failed to create QUIC endpoint")?;
// TODO: Allow TLS configuration via runtime flags
// TODO: Support WebPKI
ep.set_default_client_config(ClientConfig::with_platform_verifier());
let conn = ep
.connect(import_addr, &import_san)
.context("failed to connect using QUIC")?;
let conn = conn.await.context("failed to establish QUIC connection")?;
crate::handle_run(
wrpc_transport_quic::Client::from(conn),
(),
*timeout,
workload,
)
.await
}

#[instrument(level = "trace", ret(level = "trace"))]
pub async fn run(cmd: Command) -> anyhow::Result<()> {
match cmd {
Command::Run(args) => handle_run(args).await,
// TODO: Implement serving
}
}
110 changes: 110 additions & 0 deletions crates/wasmtime-cli/src/unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Context as _;
use clap::Parser;
use tracing::{error, instrument};

/// Unix Domain Socket transport
#[derive(Parser, Debug)]
pub enum Command {
Run(RunArgs),
Serve(ServeArgs),
}

/// Run a command component
#[derive(Parser, Debug)]
pub struct RunArgs {
/// Invocation timeout
#[arg(long, default_value = crate::DEFAULT_TIMEOUT)]
timeout: humantime::Duration,

/// Path to send import invocations to
#[arg(long)]
import: PathBuf,

/// Path or URL to Wasm command component
workload: String,
}

/// Serve a reactor component
#[derive(Parser, Debug)]
pub struct ServeArgs {
/// Invocation timeout
#[arg(long, default_value = crate::DEFAULT_TIMEOUT)]
timeout: humantime::Duration,

/// Path to send import invocations to
#[arg(long)]
import: PathBuf,

/// Path to listen for export invocations on
#[arg(long)]
export: PathBuf,

/// Path or URL to Wasm command component
workload: String,
}

#[instrument(level = "trace", ret(level = "trace"))]
pub async fn handle_run(
RunArgs {
timeout,
import,
ref workload,
}: RunArgs,
) -> anyhow::Result<()> {
crate::handle_run(
wrpc_transport::unix::Client::from(import),
(),
*timeout,
workload,
)
.await
}

#[instrument(level = "trace", ret(level = "trace"))]
pub async fn handle_serve(
ServeArgs {
timeout,
export,
import,
ref workload,
}: ServeArgs,
) -> anyhow::Result<()> {
let lis = tokio::net::UnixListener::bind(&export).with_context(|| {
format!(
"failed to bind Unix socket listener on `{}`",
export.display()
)
})?;
let srv = Arc::new(wrpc_transport::Server::default());
let accept = tokio::spawn({
let srv = Arc::clone(&srv);
async move {
loop {
if let Err(err) = srv.accept(&lis).await {
error!(?err, "failed to accept Unix socket connection");
}
}
}
});
let res = crate::handle_serve(
srv.as_ref(),
wrpc_transport::unix::Client::from(import),
(),
*timeout,
workload,
)
.await;
accept.abort();
res
}

#[instrument(level = "trace", ret(level = "trace"))]
pub async fn run(cmd: Command) -> anyhow::Result<()> {
match cmd {
Command::Run(args) => handle_run(args).await,
Command::Serve(args) => handle_serve(args).await,
}
}
60 changes: 60 additions & 0 deletions crates/wasmtime-cli/src/web.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use anyhow::Context as _;
use clap::Parser;
use tracing::instrument;
use wtransport::{ClientConfig, Endpoint};

pub const DEFAULT_ADDR: &str = "https://localhost:4433";

/// WebTransport transport
#[derive(Parser, Debug)]
pub enum Command {
Run(RunArgs),
}

/// Run a command component
#[derive(Parser, Debug)]
pub struct RunArgs {
/// Invocation timeout
#[arg(long, default_value = crate::DEFAULT_TIMEOUT)]
timeout: humantime::Duration,

/// Address to send import invocations to
#[arg(long, default_value = DEFAULT_ADDR)]
import: String,

/// Path or URL to Wasm command component
workload: String,
}

#[instrument(level = "trace", ret(level = "trace"))]
pub async fn handle_run(
RunArgs {
timeout,
import,
ref workload,
}: RunArgs,
) -> anyhow::Result<()> {
let ep = Endpoint::client(ClientConfig::default())
.context("failed to create WebTransport endpoint")?;
// TODO: Allow TLS configuration via runtime flags
// TODO: Support WebPKI
let conn = ep
.connect(import)
.await
.context("failed to connect using WebTransport")?;
crate::handle_run(
wrpc_transport_web::Client::from(conn),
(),
*timeout,
workload,
)
.await
}

#[instrument(level = "trace", ret(level = "trace"))]
pub async fn run(cmd: Command) -> anyhow::Result<()> {
match cmd {
Command::Run(args) => handle_run(args).await,
// TODO: Implement serving
}
}

0 comments on commit 6492159

Please sign in to comment.