Skip to content

Commit

Permalink
set headers/status from wasi
Browse files Browse the repository at this point in the history
  • Loading branch information
robherley committed Jan 29, 2025
1 parent 27c4233 commit a0d65f0
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 26 deletions.
13 changes: 8 additions & 5 deletions examples/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ wit_bindgen::generate!({
// https://github.com/bytecodealliance/wasi-rs

fn main() {
funcgg::runtime::responder::set_header("X-Foo", "bar");
funcgg::runtime::responder::set_status(200);
funcgg::function::responder::set_header("X-Foo", "bar");
funcgg::function::responder::set_header("Content-Type", "application/json");
funcgg::function::responder::set_status(201);

println!("Environment variables:");
for (key, value) in std::env::vars() {
println!("{}: {}", key, value);
println!("{{");
for (i, c) in ('a'..='z').enumerate() {
std::thread::sleep(std::time::Duration::from_millis(500));
println!(" \"{}\": \"{}\",", i, c);
}
println!("}}");
}
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod wit {

pub mod http {
use actix_web::{http, ResponseError};
use anyhow::anyhow;
use std::fmt::{Display, Formatter, Result};

#[derive(Debug)]
Expand All @@ -39,4 +40,10 @@ pub mod http {
Error { inner }
}
}

impl From<tokio::sync::oneshot::error::RecvError> for Error {
fn from(err: tokio::sync::oneshot::error::RecvError) -> Error {
anyhow!(err).into()
}
}
}
54 changes: 39 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use actix_web::{
middleware::Logger, post, rt::spawn, web, App, HttpResponse, HttpServer, Responder, Result,
};
use actix_web::HttpResponseBuilder;
use actix_web::{middleware::Logger, post, rt::spawn, web, App, HttpServer, Responder, Result};
use funcgg::http::Error;
use funcgg::runtime::HTTPHead;
use funcgg::{
runtime::Sandbox,
streams::{InputStream, OutputStream},
};
use futures::StreamExt;
use log::{error, warn};
use tokio::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};

// tokio_util::sync::CancellationToken
// https://tokio.rs/tokio/topics/shutdown
Expand All @@ -33,25 +34,48 @@ async fn handle(mut body: web::Payload) -> Result<impl Responder, Error> {
});

let (stdout, mut output_rx) = OutputStream::new();
let (body_tx, body_rx) = channel::<Result<actix_web::web::Bytes, actix_web::Error>>(1);
let (body_tx, body_rx) = mpsc::channel::<Result<actix_web::web::Bytes, actix_web::Error>>(1);
let (first_write_tx, first_write_rx) = oneshot::channel::<()>();

spawn(async move {
while let Some(item) = output_rx.recv().await {
if let Err(err) = body_tx.send(Ok(item)).await {
warn!("unable to send chunk: {:?}", err);
break;
spawn({
let mut first_write_tx = Some(first_write_tx);
async move {
while let Some(item) = output_rx.recv().await {
if let Some(first_write_tx) = first_write_tx.take() {
let _ = first_write_tx.send(());
}
if let Err(err) = body_tx.send(Ok(item)).await {
warn!("unable to send chunk: {:?}", err);
break;
}
}
}
});

spawn(async move {
if let Err(err) = sandbox.call(stdin, stdout).await {
error!("sandbox error: {:?}", err);
let head = Arc::new(Mutex::new(HTTPHead::default()));

spawn({
let head = head.clone();
async move {
if let Err(err) = sandbox.call(stdin, stdout, head).await {
error!("sandbox error: {:?}", err);
}
Ok::<(), Error>(())
}
Ok::<(), Error>(())
});

Ok(HttpResponse::Ok().streaming(tokio_stream::wrappers::ReceiverStream::new(body_rx)))
_ = first_write_rx.await;

let head = match head.lock() {
Ok(h) => h.clone(),
Err(err) => {
error!("unable to unwrap head: {:?}", err);
HTTPHead::default()
}
};

let mut builder: HttpResponseBuilder = head.into();
Ok(builder.streaming(tokio_stream::wrappers::ReceiverStream::new(body_rx)))
}

#[actix_web::main]
Expand Down
1 change: 1 addition & 0 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod sandbox;

pub use sandbox::HTTPHead;
pub use sandbox::Sandbox;
56 changes: 51 additions & 5 deletions src/runtime/sandbox.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::{Arc, Mutex};

use anyhow::Result;
use log::{info, warn};
use tokio::spawn;
Expand All @@ -17,13 +19,49 @@ const MAX_RUNTIME_DURATION: std::time::Duration = std::time::Duration::from_secs
// epoch_interruption: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.epoch_interruption
// fuel: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.consume_fuel

#[derive(Debug, Clone)]
pub struct HTTPHead {
pub status: u16,
pub headers: Vec<(String, String)>,
}

impl From<HTTPHead> for actix_web::HttpResponseBuilder {
fn from(head: HTTPHead) -> Self {
let status = match actix_web::http::StatusCode::from_u16(head.status) {
Ok(status) => status,
Err(_) => {
warn!("invalid status code: {:?}", head.status);
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR
}
};

let mut builder = actix_web::HttpResponseBuilder::new(status);
for (key, value) in head.headers {
if let Ok(header_value) = actix_web::http::header::HeaderValue::from_str(&value) {
let _ = builder.append_header((key, header_value));
}
}
builder
}
}

impl Default for HTTPHead {
fn default() -> Self {
Self {
status: 200,
headers: vec![],
}
}
}

pub struct State {
ctx: WasiCtx,
table: ResourceTable,
head: Arc<Mutex<HTTPHead>>,
}

impl State {
pub fn new(stdin: InputStream, stdout: OutputStream) -> Self {
pub fn new(stdin: InputStream, stdout: OutputStream, head: Arc<Mutex<HTTPHead>>) -> Self {
let ctx = WasiCtxBuilder::new()
.env("FUNCGG", "1")
.stdin(AsyncStdinStream::from(stdin))
Expand All @@ -33,6 +71,7 @@ impl State {
Self {
ctx,
table: ResourceTable::default(),
head,
}
}
}
Expand All @@ -46,13 +85,19 @@ impl WasiView for State {
}
}

impl wit::funcgg::runtime::responder::Host for State {
impl wit::funcgg::function::responder::Host for State {
async fn set_status(&mut self, status: u16) {
info!("set_status: {:?}", status);
if let Ok(mut head) = self.head.lock() {
head.status = status;
}
}

async fn set_header(&mut self, key: String, value: String) {
info!("set_header: {:?}={:?}", key, value);
if let Ok(mut head) = self.head.lock() {
head.headers.push((key, value));
}
}
}

Expand Down Expand Up @@ -90,8 +135,9 @@ impl Sandbox {
&mut self,
stdin: InputStream,
stdout: OutputStream,
head: Arc<Mutex<HTTPHead>>,
) -> Result<(), anyhow::Error> {
let state = State::new(stdin, stdout);
let state = State::new(stdin, stdout, head);
let mut store = Store::new(&self.engine, state);
store.set_epoch_deadline(1);

Expand All @@ -112,10 +158,10 @@ impl Sandbox {
});

let command = Command::instantiate_async(&mut store, &self.component, &self.linker).await?;
let result = command.wasi_cli_run().call_run(&mut store).await?;
// exit codes are "unstable" still: https://github.com/WebAssembly/wasi-cli/blob/d4fddec89fb9354509dbfa29a5557c58983f327a/wit/exit.wit#L15
let result = command.wasi_cli_run().call_run(&mut store).await;
_ = finished_tx.send(());

// exit codes are "unstable" still: https://github.com/WebAssembly/wasi-cli/blob/d4fddec89fb9354509dbfa29a5557c58983f327a/wit/exit.wit#L15
match result {
Ok(_) => Ok(()),
Err(_) => Err(anyhow::anyhow!("wasi command failed")),
Expand Down
2 changes: 1 addition & 1 deletion wit/funcgg.wit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package funcgg:runtime@0.1.0;
package funcgg:function@0.1.0;

// interface http {
// type status = u32;
Expand Down

0 comments on commit a0d65f0

Please sign in to comment.