Skip to content

Commit

Permalink
Merge branch 'main' into wasi-sockets-udp-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
manekinekko authored Dec 14, 2023
2 parents 6b5351a + a13795a commit f9297f0
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 70 deletions.
27 changes: 15 additions & 12 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ jobs:
name: Test
strategy:
matrix:
name: [
linux,
windows,
macos
os: [
ubuntu-latest,
windows-latest,
macos-latest
]
include:
- name: linux
os: ubuntu-latest
- name: windows
os: windows-latest
- name: macos
os: macos-latest
node: [18.x, 20.x, latest]
exclude:
- os: macos-latest
node: 20.x
- os: macos-latest
node: 18.x
- os: windows-latest
node: 20.x
- os: windows-latest
node: 18.x
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
Expand All @@ -53,7 +56,7 @@ jobs:
- uses: actions/setup-node@v3
with:
node-version: 'lts/*'
node-version: ${{ matrix.node }}
- name: Install NPM packages
run: npm install

Expand Down
7 changes: 6 additions & 1 deletion crates/js-component-bindgen/src/transpile_bindgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,12 @@ impl<'a> Instantiator<'a, '_> {
.gen
.local_names
.get_or_create(
&format!("import:{}-{}", import_name, &func.name),
&format!(
"import:{}-{}-{}",
import_specifier,
maybe_iface_member.as_deref().unwrap_or(""),
&func.name
),
&func.name,
)
.0
Expand Down
68 changes: 46 additions & 22 deletions packages/preview2-shim/lib/io/worker-http.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { createStream, getStreamOrThrow } from "./worker-thread.js";
import { createServer, request as httpRequest } from "node:http";
import { request as httpsRequest } from "node:https";
import {
createServer,
request as httpRequest,
Agent as HttpAgent,
} from "node:http";
import { request as httpsRequest, Agent as HttpsAgent } from "node:https";
import { parentPort } from "node:worker_threads";
import { HTTP_SERVER_INCOMING_HANDLER } from "./calls.js";

const agentOptions = {
keepAlive: true,
};
const httpAgent = new HttpAgent(agentOptions);
const httpsAgent = new HttpsAgent(agentOptions);

const servers = new Map();

let responseCnt = 0;
Expand Down Expand Up @@ -59,7 +69,15 @@ export async function startHttpServer(id, { port, host }) {
servers.set(id, server);
}

export async function createHttpRequest(method, url, headers, bodyId) {
export async function createHttpRequest(
method,
url,
headers,
bodyId,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
) {
let stream = null;
if (bodyId) {
try {
Expand All @@ -84,47 +102,56 @@ export async function createHttpRequest(method, url, headers, bodyId) {
const parsedUrl = new URL(url);
let req;
switch (parsedUrl.protocol) {
case 'http:':
case "http:":
req = httpRequest({
agent: httpAgent,
method,
host: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname + parsedUrl.search,
headers
headers,
timeout: connectTimeout && Number(connectTimeout)
});
break;
case 'https:':
case "https:":
req = httpsRequest({
agent: httpsAgent,
method,
host: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname + parsedUrl.search,
headers
headers,
timeout: connectTimeout && Number(connectTimeout)
});
break;
default:
throw { tag: 'HTTP-protocol-error' };
throw { tag: "HTTP-protocol-error" };
}
if (stream) {
stream.pipe(req);
} else {
req.end();
}
const res = await new Promise((resolve, reject) => {
req.on('response', resolve);
req.on('close', () => reject);
req.on('error', reject);
req.on("response", resolve);
req.on("close", () => reject);
req.on("error", reject);
});
res.on('end', () => void res.emit("readable"));
if (firstByteTimeout)
res.setTimeout(Number(firstByteTimeout));
if (betweenBytesTimeout)
res.on("readable", () => {
res.setTimeout(Number(betweenBytesTimeout));
});
res.on("end", () => void res.emit("readable"));
const bodyStreamId = createStream(res);
return {
status: res.statusCode,
headers: Array.from(Object.entries(res.headers)),
bodyStreamId: bodyStreamId,
};
} catch (e) {
if (e?.tag)
throw e;
if (e?.tag) throw e;
const err = getFirstError(e);
switch (err.code) {
case "ECONNRESET":
Expand All @@ -149,12 +176,9 @@ export async function createHttpRequest(method, url, headers, bodyId) {
}
}

function getFirstError (e) {
if (typeof e !== 'object' || e === null)
return e;
if (e.cause)
return getFirstError(e.cause);
if (e instanceof AggregateError)
return getFirstError(e.errors[0]);
function getFirstError(e) {
if (typeof e !== "object" || e === null) return e;
if (e.cause) return getFirstError(e.cause);
if (e instanceof AggregateError) return getFirstError(e.errors[0]);
return e;
}
}
32 changes: 7 additions & 25 deletions packages/preview2-shim/lib/io/worker-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,14 @@ class OutputStream {
return streamIoErrorCall(
OUTPUT_STREAM_SPLICE | this.#streamType,
this.#id,
src.#id,
len
{ src: src.#id, len }
);
}
blockingSplice(src, len) {
return streamIoErrorCall(
OUTPUT_STREAM_BLOCKING_SPLICE | this.#streamType,
this.#id,
inputStreamId(src),
len
{ src: inputStreamId(src), len }
);
}
subscribe() {
Expand Down Expand Up @@ -279,53 +277,37 @@ export const streams = { InputStream, OutputStream };

class Pollable {
#id;
#ready = false;
get _id() {
return this.#id;
}
ready() {
if (this.#ready) return true;
const ready = ioCall(POLL_POLLABLE_READY, this.#id);
if (ready) this.#ready = true;
return ready;
if (this.#id === 0) return true;
return ioCall(POLL_POLLABLE_READY, this.#id);
}
block() {
if (!this.#ready) {
ioCall(POLL_POLLABLE_BLOCK, this.#id);
this.#ready = true;
}
if (this.#id === 0) return;
ioCall(POLL_POLLABLE_BLOCK, this.#id);
}
static _getId(pollable) {
return pollable.#id;
}
static _create(id) {
const pollable = new Pollable();
pollable.#id = id;
if (id === 0) pollable.#ready = true;
return pollable;
}
static _markReady(pollable) {
pollable.#ready = true;
}
}

export const pollableCreate = Pollable._create;
delete Pollable._create;

const pollableMarkReady = Pollable._markReady;
delete Pollable._markReady;

const pollableGetId = Pollable._getId;
delete Pollable._getId;

export const poll = {
Pollable,
poll(list) {
const doneList = ioCall(POLL_POLL_LIST, null, list.map(pollableGetId));
for (const idx of doneList) {
pollableMarkReady(list[idx]);
}
return doneList;
return ioCall(POLL_POLL_LIST, null, list.map(pollableGetId));
},
};

Expand Down
27 changes: 22 additions & 5 deletions packages/preview2-shim/lib/io/worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,26 @@ function handle(call, id, payload) {
switch (call) {
// Http
case HTTP_CREATE_REQUEST: {
const { method, url, headers, body } = payload;
return createFuture(createHttpRequest(method, url, headers, body));
const {
method,
url,
headers,
body,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout,
} = payload;
return createFuture(
createHttpRequest(
method,
url,
headers,
body,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
)
);
}
case OUTPUT_STREAM_CREATE | HTTP: {
const stream = new PassThrough();
Expand Down Expand Up @@ -351,7 +369,6 @@ function handle(call, id, payload) {
case INPUT_STREAM_CREATE | STDIN: {
const stream = createReadStream(null, {
fd: 0,
autoClose: false,
highWaterMark: 64 * 1024,
});
// for some reason fs streams dont emit readable on end
Expand Down Expand Up @@ -622,7 +639,7 @@ function handle(call, id, payload) {
for (const [idx, id] of payload.entries()) {
if (!unfinishedPolls.has(id)) doneList.push(idx);
}
if (doneList.length > 0) return doneList;
if (doneList.length > 0) return new Uint32Array(doneList);
// if all polls are promise type, we just race them
return Promise.race(
payload.map((id) => unfinishedPolls.get(id))
Expand All @@ -632,7 +649,7 @@ function handle(call, id, payload) {
}
if (doneList.length === 0)
throw new Error("poll promise did not unregister poll");
return doneList;
return new Uint32Array(doneList);
});
}

Expand Down
16 changes: 12 additions & 4 deletions packages/preview2-shim/lib/nodejs/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ class OutgoingRequest {
return this.#headers;
}
[symbolDispose]() {}
static _handle(request, _options) {
// TODO: handle options timeouts
static _handle(request, options) {
const connectTimeout = options?.connectTimeoutMs();
const betweenBytesTimeout = options?.betweenBytesTimeoutMs();
const firstByteTimeout = options?.firstByteTimeoutMs();
const scheme = schemeString(request.#scheme);
const url = scheme + request.#authority + (request.#pathWithQuery || "");
const headers = [["host", request.#authority]];
Expand All @@ -285,7 +287,10 @@ class OutgoingRequest {
request.#method.val || request.#method.tag,
url,
headers,
outgoingBodyOutputStreamId(request.#body)
outgoingBodyOutputStreamId(request.#body),
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
);
}
}
Expand Down Expand Up @@ -412,13 +417,16 @@ class FutureIncomingResponse {
[symbolDispose]() {
if (this.#pollId) ioCall(FUTURE_DISPOSE | HTTP, this.#pollId);
}
static _create(method, url, headers, body) {
static _create(method, url, headers, body, connectTimeout, betweenBytesTimeout, firstByteTimeout) {
const res = new FutureIncomingResponse();
res.#pollId = ioCall(HTTP_CREATE_REQUEST, null, {
method,
url,
headers,
body,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
});
return res;
}
Expand Down
2 changes: 1 addition & 1 deletion submodules/wasmtime

0 comments on commit f9297f0

Please sign in to comment.