Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
robherley committed Jan 18, 2025
1 parent 8ba6c14 commit ca48e9d
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 47 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ jobs:

steps:
- uses: actions/checkout@v4
- run: script/build-examples
- run: cargo build --verbose
- run: cargo test --verbose
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
examples/*/dist/

tmp/

# Created by https://www.toptal.com/developers/gitignore/api/rust,visualstudiocode,macos
Expand Down
3 changes: 3 additions & 0 deletions examples/go-hello-world/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/robherley/func.gg/examples/go-hello-world

go 1.23.1
22 changes: 22 additions & 0 deletions examples/go-hello-world/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"fmt"
"time"
)

func main() {
fmt.Println("{")

x := 26
for i := range x {
fmt.Printf(" %q: %d", string('A'+i), i)
if i < x-1 {
fmt.Print(",")
}
fmt.Println()
time.Sleep(1 * time.Second)
}

fmt.Println("}")
}
25 changes: 25 additions & 0 deletions script/build-examples
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash

set -eo pipefail

ROOT=$(realpath "$(dirname "$0")/..")
TMP="$ROOT/tmp"

build_go_hello_world() {
DIR="$ROOT/examples/go-hello-world"
DIST="$DIR/dist"
mkdir -p "$DIST"
echo "Building $DIST/main.go"
GOOS=wasip1 GOARCH=wasm go build -o "$DIST/main.wasm" "$DIR/main.go"

if [ -z "$CI" ]; then
echo "Building $DIST/main.tinygo.wasm"
GOOS=wasip1 GOARCH=wasm tinygo build -target wasi -o "$DIST/main.tinygo.wasm" "$DIR/main.go"
fi
}

build_go_hello_world

mkdir -p "$TMP"
rm -rf "$TMP/compile-cache"

52 changes: 38 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use func_gg::{
streams::{InputStream, OutputStream},
};
use futures::StreamExt;
use log::error;
use log::{error, warn};
use tokio::sync::mpsc::channel;

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -35,42 +36,65 @@ impl ResponseError for Error {
// https://tokio.rs/tokio/topics/shutdown
#[post("/")] // note: default payload limit is 256kB from actix-web, but is configurable with PayloadConfig
async fn handle(mut body: web::Payload) -> Result<impl Responder, Error> {
let binary = include_bytes!("/Users/robherley/dev/webfunc-handler/dist/main.wasm");
let binary = include_bytes!("../examples/go-hello-world/dist/main.wasm");
let mut sandbox = Sandbox::new(binary.to_vec())?;

let (stdin, input_tx) = InputStream::new();

// collect input from request body
spawn(async move {
while let Some(item) = body.next().await {
input_tx.send(item?).await?;
if let Err(e) = input_tx.send(item?).await {
error!("unable to send chunk: {:?}", e);
break;
}
}
Ok::<(), Error>(())
});

let (stdout, output_rx, mut first_write_rx) = OutputStream::new();
let (stdout, mut output_rx) = OutputStream::new();
let (body_tx, body_rx) = channel::<Result<actix_web::web::Bytes, actix_web::Error>>(1);
let stream = tokio_stream::wrappers::ReceiverStream::new(body_rx);

let (first_char_tx, first_char_rx) = tokio::sync::oneshot::channel::<u8>();

spawn(async move {
let mut first_char_tx = Some(first_char_tx);
while let Some(item) = output_rx.recv().await {
if let Some(tx) = first_char_tx.take() {
if let Err(err) = tx.send(item[0]) {
warn!("failed to send first char: {:?}", err);
}
}

if let Err(e) = body_tx.send(Ok(item)).await {
warn!("unable to send chunk: {:?}", e);
break;
}
}
});

// invoke the function
spawn(async move {
sandbox.call(stdin, stdout).await?;
Ok::<(), Error>(())
});

let content_type = match first_write_rx.recv().await {
Some(b'{') => "application/json",
Some(b'<') => "text/html",
let content_type = match first_char_rx.await {
Ok(b'{') => "application/json",
Ok(b'<') => "text/html",
_ => "text/plain",
};

Ok(HttpResponse::Ok().content_type(content_type).streaming(
tokio_stream::wrappers::ReceiverStream::new(output_rx)
.map(|item| Ok::<_, Error>(actix_web::web::Bytes::from(item))),
))
Ok(HttpResponse::Ok()
.content_type(content_type)
.streaming(stream))
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.filter_module("wasmtime_wasi", log::LevelFilter::Warn)
.filter_module("tracing", log::LevelFilter::Warn)
.init();

let addr = format!(
"{}:{}",
Expand Down
15 changes: 8 additions & 7 deletions src/runtime/sandbox.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::env;
use std::sync::Arc;

use log::info;
Expand Down Expand Up @@ -28,18 +29,16 @@ pub enum Error {
pub struct Sandbox {
#[allow(dead_code)]
config: Config,
#[allow(dead_code)]
engine: Engine,
linker: Linker<WasiP1Ctx>,
module: Module,
}

const CACHE_PATH: &str = "/Users/robherley/dev/func.gg/tmp/cache/compiled";

impl Sandbox {
pub fn new(binary: Vec<u8>) -> Result<Self, Error> {
// NOTE: if config changes, we need to recompile the module
let mut config = wasmtime::Config::default();
config.debug_info(true);
config.debug_info(false);
config.async_support(true);
config.epoch_interruption(true);

Expand All @@ -48,13 +47,15 @@ impl Sandbox {
let mut linker: Linker<WasiP1Ctx> = Linker::new(&engine);
preview1::add_to_linker_async(&mut linker, |t| t)?;

let module = if std::path::Path::new(CACHE_PATH).exists() {
let cache_path =
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tmp/compile-cache");
let module = if cache_path.exists() {
info!("loading cached module from disk");
unsafe { Module::deserialize_file(&engine, CACHE_PATH)? }
unsafe { Module::deserialize_file(&engine, cache_path)? }
} else {
info!("compiling module from binary");
let module = Module::new(&engine, binary)?;
std::fs::write(CACHE_PATH, module.serialize()?)
std::fs::write(cache_path, module.serialize()?)
.map_err(|e| Error::Other(e.to_string()))?;
module
};
Expand Down
57 changes: 31 additions & 26 deletions src/streams/output.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::time::Instant;

use bytes::Bytes;
use log::{debug, error};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use wasmtime_wasi::{
async_trait, HostOutputStream, StdoutStream, StreamError, StreamResult, Subscribe,
Expand All @@ -7,45 +10,43 @@ use wasmtime_wasi::{
#[derive(Clone)]
pub struct OutputStream {
tx: Sender<Bytes>,
first_tx: Option<Sender<u8>>,
}

impl OutputStream {
pub fn new() -> (Self, Receiver<Bytes>, Receiver<u8>) {
pub fn new() -> (Self, Receiver<Bytes>) {
let (tx, rx) = channel::<Bytes>(1);
let (first_tx, first_rx) = channel::<u8>(1);
(
Self {
tx,
first_tx: Some(first_tx),
},
rx,
first_rx,
)
(Self { tx }, rx)
}
}

#[async_trait]
impl Subscribe for OutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) {
if self.tx.capacity() == 0 {
debug!("zero capacity, waiting for permit");
let start = Instant::now();
// asynchronously wait for a permit to be available, then immediately drop it to release it
// could cause some contention, consider making the buffer larger or the memory implication of unbounded
let permit = self.tx.reserve().await;
drop(permit);
let duration = start.elapsed();
debug!("waiting for permit took {:?}", duration);
}
}
}

#[async_trait]
impl HostOutputStream for OutputStream {
fn write(&mut self, buf: Bytes) -> StreamResult<()> {
if buf.is_empty() {
return Ok(());
}

if let Some(first_tx) = self.first_tx.take() {
if let Err(err) = first_tx.try_send(buf[0]) {
return Err(StreamError::LastOperationFailed(err.into()));
}
}

match self.tx.try_send(Bytes::from(buf)) {
match self.tx.try_send(buf) {
Ok(()) => Ok(()),
Err(err) => Err(StreamError::LastOperationFailed(err.into())),
Err(err) => {
error!("failed to send chunk: {:?}", err);
Err(StreamError::LastOperationFailed(err.into()))
}
}
}

Expand All @@ -54,7 +55,11 @@ impl HostOutputStream for OutputStream {
}

fn check_write(&mut self) -> wasmtime_wasi::StreamResult<usize> {
Ok(usize::MAX) // unlimited
if self.tx.capacity() == 0 {
return Ok(0);
}

Ok(usize::MAX)
}
}

Expand All @@ -74,7 +79,7 @@ mod tests {

#[tokio::test]
async fn test_write() {
let (mut stream, mut rx, _) = OutputStream::new();
let (mut stream, mut rx) = OutputStream::new();

let data = Bytes::from("hello");
stream.write(data.clone()).unwrap();
Expand All @@ -85,19 +90,19 @@ mod tests {

#[tokio::test]
async fn test_flush() {
let (mut stream, _, _) = OutputStream::new();
let (mut stream, _) = OutputStream::new();
assert!(stream.flush().is_ok());
}

#[tokio::test]
async fn test_check_write() {
let (mut stream, _, _) = OutputStream::new();
let (mut stream, _) = OutputStream::new();
assert_eq!(stream.check_write().unwrap(), usize::MAX);
}

#[tokio::test]
async fn test_isatty() {
let (stream, _, _) = OutputStream::new();
let (stream, _) = OutputStream::new();
assert!(!stream.isatty());
}
}

0 comments on commit ca48e9d

Please sign in to comment.