From a0d65f001a16f89468fff594c14cd090a99afd61 Mon Sep 17 00:00:00 2001 From: Rob Herley Date: Wed, 29 Jan 2025 00:01:29 -0500 Subject: [PATCH] set headers/status from wasi --- examples/rust/src/main.rs | 13 +++++---- src/lib.rs | 7 +++++ src/main.rs | 54 ++++++++++++++++++++++++++----------- src/runtime/mod.rs | 1 + src/runtime/sandbox.rs | 56 +++++++++++++++++++++++++++++++++++---- wit/funcgg.wit | 2 +- 6 files changed, 107 insertions(+), 26 deletions(-) diff --git a/examples/rust/src/main.rs b/examples/rust/src/main.rs index 5116910..04fab36 100644 --- a/examples/rust/src/main.rs +++ b/examples/rust/src/main.rs @@ -6,11 +6,14 @@ wit_bindgen::generate!({ // https://github.com/bytecodealliance/wasi-rs fn main() { - funcgg::runtime::responder::set_header("X-Foo", "bar"); - funcgg::runtime::responder::set_status(200); + funcgg::function::responder::set_header("X-Foo", "bar"); + funcgg::function::responder::set_header("Content-Type", "application/json"); + funcgg::function::responder::set_status(201); - println!("Environment variables:"); - for (key, value) in std::env::vars() { - println!("{}: {}", key, value); + println!("{{"); + for (i, c) in ('a'..='z').enumerate() { + std::thread::sleep(std::time::Duration::from_millis(500)); + println!(" \"{}\": \"{}\",", i, c); } + println!("}}"); } diff --git a/src/lib.rs b/src/lib.rs index f9df561..7503641 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ pub mod wit { pub mod http { use actix_web::{http, ResponseError}; + use anyhow::anyhow; use std::fmt::{Display, Formatter, Result}; #[derive(Debug)] @@ -39,4 +40,10 @@ pub mod http { Error { inner } } } + + impl From for Error { + fn from(err: tokio::sync::oneshot::error::RecvError) -> Error { + anyhow!(err).into() + } + } } diff --git a/src/main.rs b/src/main.rs index 70dd25e..e64bc12 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,15 @@ -use actix_web::{ - middleware::Logger, post, rt::spawn, web, App, HttpResponse, HttpServer, Responder, Result, -}; +use actix_web::HttpResponseBuilder; +use actix_web::{middleware::Logger, post, rt::spawn, web, App, HttpServer, Responder, Result}; use funcgg::http::Error; +use funcgg::runtime::HTTPHead; use funcgg::{ runtime::Sandbox, streams::{InputStream, OutputStream}, }; use futures::StreamExt; use log::{error, warn}; -use tokio::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; +use tokio::sync::{mpsc, oneshot}; // tokio_util::sync::CancellationToken // https://tokio.rs/tokio/topics/shutdown @@ -33,25 +34,48 @@ async fn handle(mut body: web::Payload) -> Result { }); let (stdout, mut output_rx) = OutputStream::new(); - let (body_tx, body_rx) = channel::>(1); + let (body_tx, body_rx) = mpsc::channel::>(1); + let (first_write_tx, first_write_rx) = oneshot::channel::<()>(); - spawn(async move { - while let Some(item) = output_rx.recv().await { - if let Err(err) = body_tx.send(Ok(item)).await { - warn!("unable to send chunk: {:?}", err); - break; + spawn({ + let mut first_write_tx = Some(first_write_tx); + async move { + while let Some(item) = output_rx.recv().await { + if let Some(first_write_tx) = first_write_tx.take() { + let _ = first_write_tx.send(()); + } + if let Err(err) = body_tx.send(Ok(item)).await { + warn!("unable to send chunk: {:?}", err); + break; + } } } }); - spawn(async move { - if let Err(err) = sandbox.call(stdin, stdout).await { - error!("sandbox error: {:?}", err); + let head = Arc::new(Mutex::new(HTTPHead::default())); + + spawn({ + let head = head.clone(); + async move { + if let Err(err) = sandbox.call(stdin, stdout, head).await { + error!("sandbox error: {:?}", err); + } + Ok::<(), Error>(()) } - Ok::<(), Error>(()) }); - Ok(HttpResponse::Ok().streaming(tokio_stream::wrappers::ReceiverStream::new(body_rx))) + _ = first_write_rx.await; + + let head = match head.lock() { + Ok(h) => h.clone(), + Err(err) => { + error!("unable to unwrap head: {:?}", err); + HTTPHead::default() + } + }; + + let mut builder: HttpResponseBuilder = head.into(); + Ok(builder.streaming(tokio_stream::wrappers::ReceiverStream::new(body_rx))) } #[actix_web::main] diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index e977f36..d00ffaf 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,3 +1,4 @@ mod sandbox; +pub use sandbox::HTTPHead; pub use sandbox::Sandbox; diff --git a/src/runtime/sandbox.rs b/src/runtime/sandbox.rs index 443675d..69eeefb 100644 --- a/src/runtime/sandbox.rs +++ b/src/runtime/sandbox.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, Mutex}; + use anyhow::Result; use log::{info, warn}; use tokio::spawn; @@ -17,13 +19,49 @@ const MAX_RUNTIME_DURATION: std::time::Duration = std::time::Duration::from_secs // epoch_interruption: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.epoch_interruption // fuel: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.consume_fuel +#[derive(Debug, Clone)] +pub struct HTTPHead { + pub status: u16, + pub headers: Vec<(String, String)>, +} + +impl From for actix_web::HttpResponseBuilder { + fn from(head: HTTPHead) -> Self { + let status = match actix_web::http::StatusCode::from_u16(head.status) { + Ok(status) => status, + Err(_) => { + warn!("invalid status code: {:?}", head.status); + actix_web::http::StatusCode::INTERNAL_SERVER_ERROR + } + }; + + let mut builder = actix_web::HttpResponseBuilder::new(status); + for (key, value) in head.headers { + if let Ok(header_value) = actix_web::http::header::HeaderValue::from_str(&value) { + let _ = builder.append_header((key, header_value)); + } + } + builder + } +} + +impl Default for HTTPHead { + fn default() -> Self { + Self { + status: 200, + headers: vec![], + } + } +} + pub struct State { ctx: WasiCtx, table: ResourceTable, + head: Arc>, } impl State { - pub fn new(stdin: InputStream, stdout: OutputStream) -> Self { + pub fn new(stdin: InputStream, stdout: OutputStream, head: Arc>) -> Self { let ctx = WasiCtxBuilder::new() .env("FUNCGG", "1") .stdin(AsyncStdinStream::from(stdin)) @@ -33,6 +71,7 @@ impl State { Self { ctx, table: ResourceTable::default(), + head, } } } @@ -46,13 +85,19 @@ impl WasiView for State { } } -impl wit::funcgg::runtime::responder::Host for State { +impl wit::funcgg::function::responder::Host for State { async fn set_status(&mut self, status: u16) { info!("set_status: {:?}", status); + if let Ok(mut head) = self.head.lock() { + head.status = status; + } } async fn set_header(&mut self, key: String, value: String) { info!("set_header: {:?}={:?}", key, value); + if let Ok(mut head) = self.head.lock() { + head.headers.push((key, value)); + } } } @@ -90,8 +135,9 @@ impl Sandbox { &mut self, stdin: InputStream, stdout: OutputStream, + head: Arc>, ) -> Result<(), anyhow::Error> { - let state = State::new(stdin, stdout); + let state = State::new(stdin, stdout, head); let mut store = Store::new(&self.engine, state); store.set_epoch_deadline(1); @@ -112,10 +158,10 @@ impl Sandbox { }); let command = Command::instantiate_async(&mut store, &self.component, &self.linker).await?; - let result = command.wasi_cli_run().call_run(&mut store).await?; + // exit codes are "unstable" still: https://github.com/WebAssembly/wasi-cli/blob/d4fddec89fb9354509dbfa29a5557c58983f327a/wit/exit.wit#L15 + let result = command.wasi_cli_run().call_run(&mut store).await; _ = finished_tx.send(()); - // exit codes are "unstable" still: https://github.com/WebAssembly/wasi-cli/blob/d4fddec89fb9354509dbfa29a5557c58983f327a/wit/exit.wit#L15 match result { Ok(_) => Ok(()), Err(_) => Err(anyhow::anyhow!("wasi command failed")), diff --git a/wit/funcgg.wit b/wit/funcgg.wit index b44c3e9..c5504e6 100644 --- a/wit/funcgg.wit +++ b/wit/funcgg.wit @@ -1,4 +1,4 @@ -package funcgg:runtime@0.1.0; +package funcgg:function@0.1.0; // interface http { // type status = u32;