From ca48e9d4515bf04c14ad393ed15160e5b05eb9b9 Mon Sep 17 00:00:00 2001 From: Rob Herley Date: Sat, 18 Jan 2025 16:49:15 -0500 Subject: [PATCH] cleanup --- .github/workflows/cargo.yml | 1 + .gitignore | 2 ++ examples/go-hello-world/go.mod | 3 ++ examples/go-hello-world/main.go | 22 +++++++++++++ script/build-examples | 25 +++++++++++++++ src/main.rs | 52 ++++++++++++++++++++++-------- src/runtime/sandbox.rs | 15 +++++---- src/streams/output.rs | 57 ++++++++++++++++++--------------- 8 files changed, 130 insertions(+), 47 deletions(-) create mode 100644 examples/go-hello-world/go.mod create mode 100644 examples/go-hello-world/main.go create mode 100755 script/build-examples diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index a76545e..19c5c26 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -15,5 +15,6 @@ jobs: steps: - uses: actions/checkout@v4 + - run: script/build-examples - run: cargo build --verbose - run: cargo test --verbose diff --git a/.gitignore b/.gitignore index f68b3cd..d1fce48 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +examples/*/dist/ + tmp/ # Created by https://www.toptal.com/developers/gitignore/api/rust,visualstudiocode,macos diff --git a/examples/go-hello-world/go.mod b/examples/go-hello-world/go.mod new file mode 100644 index 0000000..b1044ff --- /dev/null +++ b/examples/go-hello-world/go.mod @@ -0,0 +1,3 @@ +module github.com/robherley/func.gg/examples/go-hello-world + +go 1.23.1 diff --git a/examples/go-hello-world/main.go b/examples/go-hello-world/main.go new file mode 100644 index 0000000..dbd23e9 --- /dev/null +++ b/examples/go-hello-world/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + fmt.Println("{") + + x := 26 + for i := range x { + fmt.Printf(" %q: %d", string('A'+i), i) + if i < x-1 { + fmt.Print(",") + } + fmt.Println() + time.Sleep(1 * time.Second) + } + + fmt.Println("}") +} diff --git a/script/build-examples b/script/build-examples new file mode 100755 index 0000000..a42cec1 --- /dev/null +++ b/script/build-examples @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +set -eo pipefail + +ROOT=$(realpath "$(dirname "$0")/..") +TMP="$ROOT/tmp" + +build_go_hello_world() { + DIR="$ROOT/examples/go-hello-world" + DIST="$DIR/dist" + mkdir -p "$DIST" + echo "Building $DIST/main.go" + GOOS=wasip1 GOARCH=wasm go build -o "$DIST/main.wasm" "$DIR/main.go" + + if [ -z "$CI" ]; then + echo "Building $DIST/main.tinygo.wasm" + GOOS=wasip1 GOARCH=wasm tinygo build -target wasi -o "$DIST/main.tinygo.wasm" "$DIR/main.go" + fi +} + +build_go_hello_world + +mkdir -p "$TMP" +rm -rf "$TMP/compile-cache" + diff --git a/src/main.rs b/src/main.rs index 96d0786..50b30a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,8 @@ use func_gg::{ streams::{InputStream, OutputStream}, }; use futures::StreamExt; -use log::error; +use log::{error, warn}; +use tokio::sync::mpsc::channel; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -35,42 +36,65 @@ impl ResponseError for Error { // https://tokio.rs/tokio/topics/shutdown #[post("/")] // note: default payload limit is 256kB from actix-web, but is configurable with PayloadConfig async fn handle(mut body: web::Payload) -> Result { - let binary = include_bytes!("/Users/robherley/dev/webfunc-handler/dist/main.wasm"); + let binary = include_bytes!("../examples/go-hello-world/dist/main.wasm"); let mut sandbox = Sandbox::new(binary.to_vec())?; let (stdin, input_tx) = InputStream::new(); - // collect input from request body spawn(async move { while let Some(item) = body.next().await { - input_tx.send(item?).await?; + if let Err(e) = input_tx.send(item?).await { + error!("unable to send chunk: {:?}", e); + break; + } } Ok::<(), Error>(()) }); - let (stdout, output_rx, mut first_write_rx) = OutputStream::new(); + let (stdout, mut output_rx) = OutputStream::new(); + let (body_tx, body_rx) = channel::>(1); + let stream = tokio_stream::wrappers::ReceiverStream::new(body_rx); + + let (first_char_tx, first_char_rx) = tokio::sync::oneshot::channel::(); + + spawn(async move { + let mut first_char_tx = Some(first_char_tx); + while let Some(item) = output_rx.recv().await { + if let Some(tx) = first_char_tx.take() { + if let Err(err) = tx.send(item[0]) { + warn!("failed to send first char: {:?}", err); + } + } + + if let Err(e) = body_tx.send(Ok(item)).await { + warn!("unable to send chunk: {:?}", e); + break; + } + } + }); - // invoke the function spawn(async move { sandbox.call(stdin, stdout).await?; Ok::<(), Error>(()) }); - let content_type = match first_write_rx.recv().await { - Some(b'{') => "application/json", - Some(b'<') => "text/html", + let content_type = match first_char_rx.await { + Ok(b'{') => "application/json", + Ok(b'<') => "text/html", _ => "text/plain", }; - Ok(HttpResponse::Ok().content_type(content_type).streaming( - tokio_stream::wrappers::ReceiverStream::new(output_rx) - .map(|item| Ok::<_, Error>(actix_web::web::Bytes::from(item))), - )) + Ok(HttpResponse::Ok() + .content_type(content_type) + .streaming(stream)) } #[actix_web::main] async fn main() -> std::io::Result<()> { - env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) + .filter_module("wasmtime_wasi", log::LevelFilter::Warn) + .filter_module("tracing", log::LevelFilter::Warn) + .init(); let addr = format!( "{}:{}", diff --git a/src/runtime/sandbox.rs b/src/runtime/sandbox.rs index f039753..30d1bc4 100644 --- a/src/runtime/sandbox.rs +++ b/src/runtime/sandbox.rs @@ -1,3 +1,4 @@ +use std::env; use std::sync::Arc; use log::info; @@ -28,18 +29,16 @@ pub enum Error { pub struct Sandbox { #[allow(dead_code)] config: Config, - #[allow(dead_code)] engine: Engine, linker: Linker, module: Module, } -const CACHE_PATH: &str = "/Users/robherley/dev/func.gg/tmp/cache/compiled"; - impl Sandbox { pub fn new(binary: Vec) -> Result { + // NOTE: if config changes, we need to recompile the module let mut config = wasmtime::Config::default(); - config.debug_info(true); + config.debug_info(false); config.async_support(true); config.epoch_interruption(true); @@ -48,13 +47,15 @@ impl Sandbox { let mut linker: Linker = Linker::new(&engine); preview1::add_to_linker_async(&mut linker, |t| t)?; - let module = if std::path::Path::new(CACHE_PATH).exists() { + let cache_path = + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tmp/compile-cache"); + let module = if cache_path.exists() { info!("loading cached module from disk"); - unsafe { Module::deserialize_file(&engine, CACHE_PATH)? } + unsafe { Module::deserialize_file(&engine, cache_path)? } } else { info!("compiling module from binary"); let module = Module::new(&engine, binary)?; - std::fs::write(CACHE_PATH, module.serialize()?) + std::fs::write(cache_path, module.serialize()?) .map_err(|e| Error::Other(e.to_string()))?; module }; diff --git a/src/streams/output.rs b/src/streams/output.rs index 0e8f966..2dedb39 100644 --- a/src/streams/output.rs +++ b/src/streams/output.rs @@ -1,4 +1,7 @@ +use std::time::Instant; + use bytes::Bytes; +use log::{debug, error}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use wasmtime_wasi::{ async_trait, HostOutputStream, StdoutStream, StreamError, StreamResult, Subscribe, @@ -7,45 +10,43 @@ use wasmtime_wasi::{ #[derive(Clone)] pub struct OutputStream { tx: Sender, - first_tx: Option>, } impl OutputStream { - pub fn new() -> (Self, Receiver, Receiver) { + pub fn new() -> (Self, Receiver) { let (tx, rx) = channel::(1); - let (first_tx, first_rx) = channel::(1); - ( - Self { - tx, - first_tx: Some(first_tx), - }, - rx, - first_rx, - ) + (Self { tx }, rx) } } #[async_trait] impl Subscribe for OutputStream { - async fn ready(&mut self) {} + async fn ready(&mut self) { + if self.tx.capacity() == 0 { + debug!("zero capacity, waiting for permit"); + let start = Instant::now(); + // asynchronously wait for a permit to be available, then immediately drop it to release it + // could cause some contention, consider making the buffer larger or the memory implication of unbounded + let permit = self.tx.reserve().await; + drop(permit); + let duration = start.elapsed(); + debug!("waiting for permit took {:?}", duration); + } + } } -#[async_trait] impl HostOutputStream for OutputStream { fn write(&mut self, buf: Bytes) -> StreamResult<()> { if buf.is_empty() { return Ok(()); } - if let Some(first_tx) = self.first_tx.take() { - if let Err(err) = first_tx.try_send(buf[0]) { - return Err(StreamError::LastOperationFailed(err.into())); - } - } - - match self.tx.try_send(Bytes::from(buf)) { + match self.tx.try_send(buf) { Ok(()) => Ok(()), - Err(err) => Err(StreamError::LastOperationFailed(err.into())), + Err(err) => { + error!("failed to send chunk: {:?}", err); + Err(StreamError::LastOperationFailed(err.into())) + } } } @@ -54,7 +55,11 @@ impl HostOutputStream for OutputStream { } fn check_write(&mut self) -> wasmtime_wasi::StreamResult { - Ok(usize::MAX) // unlimited + if self.tx.capacity() == 0 { + return Ok(0); + } + + Ok(usize::MAX) } } @@ -74,7 +79,7 @@ mod tests { #[tokio::test] async fn test_write() { - let (mut stream, mut rx, _) = OutputStream::new(); + let (mut stream, mut rx) = OutputStream::new(); let data = Bytes::from("hello"); stream.write(data.clone()).unwrap(); @@ -85,19 +90,19 @@ mod tests { #[tokio::test] async fn test_flush() { - let (mut stream, _, _) = OutputStream::new(); + let (mut stream, _) = OutputStream::new(); assert!(stream.flush().is_ok()); } #[tokio::test] async fn test_check_write() { - let (mut stream, _, _) = OutputStream::new(); + let (mut stream, _) = OutputStream::new(); assert_eq!(stream.check_write().unwrap(), usize::MAX); } #[tokio::test] async fn test_isatty() { - let (stream, _, _) = OutputStream::new(); + let (stream, _) = OutputStream::new(); assert!(!stream.isatty()); } }