From da885cefcfd7167b68518f07a7facfe0d2fff0be Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Sat, 9 Nov 2024 19:15:24 +0100 Subject: [PATCH 1/2] feat(wasmtime-cli): add UDS/WebTransport/QUIC support --- Cargo.lock | 4 ++ crates/wasmtime-cli/Cargo.toml | 10 +++ crates/wasmtime-cli/src/lib.rs | 15 +++++ crates/wasmtime-cli/src/quic.rs | 68 ++++++++++++++++++++ crates/wasmtime-cli/src/unix.rs | 110 ++++++++++++++++++++++++++++++++ crates/wasmtime-cli/src/web.rs | 60 +++++++++++++++++ 6 files changed, 267 insertions(+) create mode 100644 crates/wasmtime-cli/src/quic.rs create mode 100644 crates/wasmtime-cli/src/unix.rs create mode 100644 crates/wasmtime-cli/src/web.rs diff --git a/Cargo.lock b/Cargo.lock index e0b6c439..ff17b51d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5000,6 +5000,7 @@ dependencies = [ "clap", "futures", "humantime", + "quinn", "reqwest", "tokio", "tokio-util", @@ -5015,6 +5016,9 @@ dependencies = [ "wrpc-runtime-wasmtime", "wrpc-transport", "wrpc-transport-nats", + "wrpc-transport-quic", + "wrpc-transport-web", + "wtransport", ] [[package]] diff --git a/crates/wasmtime-cli/Cargo.toml b/crates/wasmtime-cli/Cargo.toml index e46c928f..21d22803 100644 --- a/crates/wasmtime-cli/Cargo.toml +++ b/crates/wasmtime-cli/Cargo.toml @@ -24,6 +24,13 @@ clap = { workspace = true, features = [ ] } futures = { workspace = true } humantime = { workspace = true } +quinn = { workspace = true, features = [ + "log", + "platform-verifier", + "ring", + "runtime-tokio", + "rustls", +] } reqwest = { workspace = true } tokio = { workspace = true, features = ["fs"] } tokio-util = { workspace = true, features = ["codec"] } @@ -47,7 +54,10 @@ wasmtime = { workspace = true, features = [ wasmtime-wasi = { workspace = true } wasmtime-wasi-http = { workspace = true } wit-component = { workspace = true } +wtransport = { workspace = true, features = ["self-signed"] } wrpc-cli = { workspace = true, features = ["nats"] } wrpc-transport-nats = { workspace = true, features = ["async-nats-0_37"] } +wrpc-transport-quic = { workspace = true } +wrpc-transport-web = { workspace = true } wrpc-transport = { workspace = true, features = ["net"] } wrpc-runtime-wasmtime = { workspace = true } diff --git a/crates/wasmtime-cli/src/lib.rs b/crates/wasmtime-cli/src/lib.rs index c5cf9545..92563f71 100644 --- a/crates/wasmtime-cli/src/lib.rs +++ b/crates/wasmtime-cli/src/lib.rs @@ -27,7 +27,11 @@ use wrpc_runtime_wasmtime::{ use wrpc_transport::{Invoke, Serve}; mod nats; +mod quic; mod tcp; +#[cfg(unix)] +mod unix; +mod web; const DEFAULT_TIMEOUT: &str = "10s"; @@ -37,7 +41,14 @@ enum Command { #[command(subcommand)] Nats(nats::Command), #[command(subcommand)] + Quic(quic::Command), + #[command(subcommand)] Tcp(tcp::Command), + #[cfg(unix)] + #[command(subcommand)] + Unix(unix::Command), + #[command(subcommand)] + Web(web::Command), } pub enum Workload { @@ -650,6 +661,10 @@ pub async fn run() -> anyhow::Result<()> { wrpc_cli::tracing::init(); match Command::parse() { Command::Nats(args) => nats::run(args).await, + Command::Quic(args) => quic::run(args).await, Command::Tcp(args) => tcp::run(args).await, + #[cfg(unix)] + Command::Unix(args) => unix::run(args).await, + Command::Web(args) => web::run(args).await, } } diff --git a/crates/wasmtime-cli/src/quic.rs b/crates/wasmtime-cli/src/quic.rs new file mode 100644 index 00000000..07af211e --- /dev/null +++ b/crates/wasmtime-cli/src/quic.rs @@ -0,0 +1,68 @@ +use core::net::{Ipv6Addr, SocketAddr}; + +use anyhow::Context as _; +use clap::Parser; +use quinn::{ClientConfig, Endpoint}; +use tracing::instrument; + +pub const DEFAULT_ADDR: &str = "[::1]:4433"; + +/// QUIC transport +#[derive(Parser, Debug)] +pub enum Command { + Run(RunArgs), +} + +/// Run a command component +#[derive(Parser, Debug)] +pub struct RunArgs { + /// Invocation timeout + #[arg(long, default_value = crate::DEFAULT_TIMEOUT)] + timeout: humantime::Duration, + + /// Address to send import invocations to + #[arg(long, default_value = DEFAULT_ADDR)] + import_addr: SocketAddr, + + /// Server name to use for import connection + #[arg(long)] + import_san: String, + + /// Path or URL to Wasm command component + workload: String, +} + +#[instrument(level = "trace", ret(level = "trace"))] +pub async fn handle_run( + RunArgs { + timeout, + import_addr, + import_san, + ref workload, + }: RunArgs, +) -> anyhow::Result<()> { + let mut ep = Endpoint::client((Ipv6Addr::UNSPECIFIED, 0).into()) + .context("failed to create QUIC endpoint")?; + // TODO: Allow TLS configuration via runtime flags + // TODO: Support WebPKI + ep.set_default_client_config(ClientConfig::with_platform_verifier()); + let conn = ep + .connect(import_addr, &import_san) + .context("failed to connect using QUIC")?; + let conn = conn.await.context("failed to establish QUIC connection")?; + crate::handle_run( + wrpc_transport_quic::Client::from(conn), + (), + *timeout, + workload, + ) + .await +} + +#[instrument(level = "trace", ret(level = "trace"))] +pub async fn run(cmd: Command) -> anyhow::Result<()> { + match cmd { + Command::Run(args) => handle_run(args).await, + // TODO: Implement serving + } +} diff --git a/crates/wasmtime-cli/src/unix.rs b/crates/wasmtime-cli/src/unix.rs new file mode 100644 index 00000000..9d8975bd --- /dev/null +++ b/crates/wasmtime-cli/src/unix.rs @@ -0,0 +1,110 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Context as _; +use clap::Parser; +use tracing::{error, instrument}; + +/// Unix Domain Socket transport +#[derive(Parser, Debug)] +pub enum Command { + Run(RunArgs), + Serve(ServeArgs), +} + +/// Run a command component +#[derive(Parser, Debug)] +pub struct RunArgs { + /// Invocation timeout + #[arg(long, default_value = crate::DEFAULT_TIMEOUT)] + timeout: humantime::Duration, + + /// Path to send import invocations to + #[arg(long)] + import: PathBuf, + + /// Path or URL to Wasm command component + workload: String, +} + +/// Serve a reactor component +#[derive(Parser, Debug)] +pub struct ServeArgs { + /// Invocation timeout + #[arg(long, default_value = crate::DEFAULT_TIMEOUT)] + timeout: humantime::Duration, + + /// Path to send import invocations to + #[arg(long)] + import: PathBuf, + + /// Path to listen for export invocations on + #[arg(long)] + export: PathBuf, + + /// Path or URL to Wasm command component + workload: String, +} + +#[instrument(level = "trace", ret(level = "trace"))] +pub async fn handle_run( + RunArgs { + timeout, + import, + ref workload, + }: RunArgs, +) -> anyhow::Result<()> { + crate::handle_run( + wrpc_transport::unix::Client::from(import), + (), + *timeout, + workload, + ) + .await +} + +#[instrument(level = "trace", ret(level = "trace"))] +pub async fn handle_serve( + ServeArgs { + timeout, + export, + import, + ref workload, + }: ServeArgs, +) -> anyhow::Result<()> { + let lis = tokio::net::UnixListener::bind(&export).with_context(|| { + format!( + "failed to bind Unix socket listener on `{}`", + export.display() + ) + })?; + let srv = Arc::new(wrpc_transport::Server::default()); + let accept = tokio::spawn({ + let srv = Arc::clone(&srv); + async move { + loop { + if let Err(err) = srv.accept(&lis).await { + error!(?err, "failed to accept Unix socket connection"); + } + } + } + }); + let res = crate::handle_serve( + srv.as_ref(), + wrpc_transport::unix::Client::from(import), + (), + *timeout, + workload, + ) + .await; + accept.abort(); + res +} + +#[instrument(level = "trace", ret(level = "trace"))] +pub async fn run(cmd: Command) -> anyhow::Result<()> { + match cmd { + Command::Run(args) => handle_run(args).await, + Command::Serve(args) => handle_serve(args).await, + } +} diff --git a/crates/wasmtime-cli/src/web.rs b/crates/wasmtime-cli/src/web.rs new file mode 100644 index 00000000..fd3e183e --- /dev/null +++ b/crates/wasmtime-cli/src/web.rs @@ -0,0 +1,60 @@ +use anyhow::Context as _; +use clap::Parser; +use tracing::instrument; +use wtransport::{ClientConfig, Endpoint}; + +pub const DEFAULT_ADDR: &str = "https://localhost:4433"; + +/// WebTransport transport +#[derive(Parser, Debug)] +pub enum Command { + Run(RunArgs), +} + +/// Run a command component +#[derive(Parser, Debug)] +pub struct RunArgs { + /// Invocation timeout + #[arg(long, default_value = crate::DEFAULT_TIMEOUT)] + timeout: humantime::Duration, + + /// Address to send import invocations to + #[arg(long, default_value = DEFAULT_ADDR)] + import: String, + + /// Path or URL to Wasm command component + workload: String, +} + +#[instrument(level = "trace", ret(level = "trace"))] +pub async fn handle_run( + RunArgs { + timeout, + import, + ref workload, + }: RunArgs, +) -> anyhow::Result<()> { + let ep = Endpoint::client(ClientConfig::default()) + .context("failed to create WebTransport endpoint")?; + // TODO: Allow TLS configuration via runtime flags + // TODO: Support WebPKI + let conn = ep + .connect(import) + .await + .context("failed to connect using WebTransport")?; + crate::handle_run( + wrpc_transport_web::Client::from(conn), + (), + *timeout, + workload, + ) + .await +} + +#[instrument(level = "trace", ret(level = "trace"))] +pub async fn run(cmd: Command) -> anyhow::Result<()> { + match cmd { + Command::Run(args) => handle_run(args).await, + // TODO: Implement serving + } +} From 9ba0cc8591f77e9ce9bac4953a15c7d95ffc1c3c Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Sat, 9 Nov 2024 19:25:35 +0100 Subject: [PATCH 2/2] chore(cli-tracing): skip `cmd` in log Signed-off-by: Roman Volosatovs --- crates/wasmtime-cli/src/nats.rs | 2 +- crates/wasmtime-cli/src/quic.rs | 2 +- crates/wasmtime-cli/src/tcp.rs | 2 +- crates/wasmtime-cli/src/unix.rs | 2 +- crates/wasmtime-cli/src/web.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/wasmtime-cli/src/nats.rs b/crates/wasmtime-cli/src/nats.rs index a09c3ced..83867f86 100644 --- a/crates/wasmtime-cli/src/nats.rs +++ b/crates/wasmtime-cli/src/nats.rs @@ -99,7 +99,7 @@ pub async fn handle_serve( crate::handle_serve(exports, imports, None, *timeout, workload).await } -#[instrument(level = "trace", ret(level = "trace"))] +#[instrument(level = "trace", skip_all, ret(level = "trace"))] pub async fn run(cmd: Command) -> anyhow::Result<()> { match cmd { Command::Run(args) => handle_run(args).await, diff --git a/crates/wasmtime-cli/src/quic.rs b/crates/wasmtime-cli/src/quic.rs index 07af211e..0f35ecdb 100644 --- a/crates/wasmtime-cli/src/quic.rs +++ b/crates/wasmtime-cli/src/quic.rs @@ -59,7 +59,7 @@ pub async fn handle_run( .await } -#[instrument(level = "trace", ret(level = "trace"))] +#[instrument(level = "trace", skip_all, ret(level = "trace"))] pub async fn run(cmd: Command) -> anyhow::Result<()> { match cmd { Command::Run(args) => handle_run(args).await, diff --git a/crates/wasmtime-cli/src/tcp.rs b/crates/wasmtime-cli/src/tcp.rs index af1be8f0..890e468f 100644 --- a/crates/wasmtime-cli/src/tcp.rs +++ b/crates/wasmtime-cli/src/tcp.rs @@ -99,7 +99,7 @@ pub async fn handle_serve( res } -#[instrument(level = "trace", ret(level = "trace"))] +#[instrument(level = "trace", skip_all, ret(level = "trace"))] pub async fn run(cmd: Command) -> anyhow::Result<()> { match cmd { Command::Run(args) => handle_run(args).await, diff --git a/crates/wasmtime-cli/src/unix.rs b/crates/wasmtime-cli/src/unix.rs index 9d8975bd..9cde0742 100644 --- a/crates/wasmtime-cli/src/unix.rs +++ b/crates/wasmtime-cli/src/unix.rs @@ -101,7 +101,7 @@ pub async fn handle_serve( res } -#[instrument(level = "trace", ret(level = "trace"))] +#[instrument(level = "trace", skip_all, ret(level = "trace"))] pub async fn run(cmd: Command) -> anyhow::Result<()> { match cmd { Command::Run(args) => handle_run(args).await, diff --git a/crates/wasmtime-cli/src/web.rs b/crates/wasmtime-cli/src/web.rs index fd3e183e..cc500d4e 100644 --- a/crates/wasmtime-cli/src/web.rs +++ b/crates/wasmtime-cli/src/web.rs @@ -51,7 +51,7 @@ pub async fn handle_run( .await } -#[instrument(level = "trace", ret(level = "trace"))] +#[instrument(level = "trace", skip_all, ret(level = "trace"))] pub async fn run(cmd: Command) -> anyhow::Result<()> { match cmd { Command::Run(args) => handle_run(args).await,