Skip to content

Commit

Permalink
wip: jco serve
Browse files Browse the repository at this point in the history
  • Loading branch information
guybedford committed Dec 8, 2023
1 parent 21ee17f commit 82b7fa0
Show file tree
Hide file tree
Showing 12 changed files with 796 additions and 595 deletions.
6 changes: 6 additions & 0 deletions packages/preview2-shim/lib/io/calls.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ export const FUTURE_DISPOSE = ++call_id << CALL_SHIFT;
// Http
export const HTTP_CREATE_REQUEST = ++call_id << 24;
export const HTTP_OUTPUT_STREAM_FINISH = ++call_id << CALL_SHIFT;
// Http server
export const HTTP_SERVER_START = ++call_id << CALL_SHIFT;
export const HTTP_SERVER_STOP = ++call_id << CALL_SHIFT;
export const HTTP_SERVER_INCOMING_HANDLER = ++call_id << CALL_SHIFT;
export const HTTP_SERVER_SET_OUTGOING_RESPONSE = ++call_id << CALL_SHIFT;
export const HTTP_SERVER_CLEAR_OUTGOING_RESPONSE = ++call_id << CALL_SHIFT;

// Clocks
export const CLOCKS_NOW = ++call_id << 24;
Expand Down
58 changes: 58 additions & 0 deletions packages/preview2-shim/lib/io/worker-http.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,63 @@
import { Readable } from "node:stream";
import { createStream, getStreamOrThrow } from "./worker-thread.js";
import { createServer } from "node:http";
import { parentPort } from "node:worker_threads";
import { HTTP_SERVER_INCOMING_HANDLER } from "./calls.js";

const servers = new Map();

let responseCnt = 0;
const responses = new Map();

export async function stopHttpServer(id) {
await new Promise((resolve) => servers.get(id).close(resolve));
}

export function clearOutgoingResponse(id) {
responses.delete(id);
}

export async function setOutgoingResponse(
id,
{ statusCode, headers, streamId }
) {
const response = responses.get(id);
const textDecoder = new TextDecoder();
response.writeHead(
statusCode,
Object.fromEntries(
headers.map(([key, val]) => [key, textDecoder.decode(val)])
)
);
const { stream } = getStreamOrThrow(streamId);
stream.pipe(response);
responses.delete(id);
}

export async function startHttpServer(id, { port, host }) {
const server = createServer((req, res) => {
// create the streams and their ids
const streamId = createStream(req);
const responseId = ++responseCnt;
parentPort.postMessage({
type: HTTP_SERVER_INCOMING_HANDLER,
id,
payload: {
responseId,
method: req.method,
pathWithQuery: req.url,
headers: Object.entries(req.headers),
streamId,
},
});
responses.set(responseId, res);
});
await new Promise((resolve, reject) => {
server.listen(port, host, resolve);
server.on("error", reject);
});
servers.set(id, server);
}

export async function createHttpRequest(method, url, headers, bodyId) {
let body = null;
Expand Down
18 changes: 17 additions & 1 deletion packages/preview2-shim/lib/io/worker-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
POLL_POLL_LIST,
POLL_POLLABLE_BLOCK,
POLL_POLLABLE_READY,
HTTP_SERVER_INCOMING_HANDLER,
} from "./calls.js";
import { STDERR } from "./calls.js";

Expand All @@ -33,10 +34,25 @@ const workerPath = fileURLToPath(
new URL("./worker-thread.js", import.meta.url)
);


const httpIncomingHandlers = new Map();
export function registerIncomingHttpHandler (id, handler) {
httpIncomingHandlers.set(id, handler);
}

/**
* @type {(call: number, id: number | null, payload: any) -> any}
*/
export let ioCall = createSyncFn(workerPath);
export let ioCall = createSyncFn(workerPath, (type, id, payload) => {
// 'callbacks' from the worker
// ONLY happens for an http server incoming handler, and NOTHING else (not even sockets, since accept is sync!)
if (type !== HTTP_SERVER_INCOMING_HANDLER)
throw new Error('Internal error: only incoming handler callback is permitted');
const handler = httpIncomingHandlers.get(id);
if (!handler)
throw new Error(`Internal error: no incoming handler registered for server ${id}`);
handler(payload);
});
if (DEBUG) {
const _ioCall = ioCall;
ioCall = function ioCall(num, id, payload) {
Expand Down
14 changes: 13 additions & 1 deletion packages/preview2-shim/lib/io/worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { resolve } from "node:dns/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { stdout, stderr, hrtime, _rawDebug, exit } from "node:process";
import { runAsWorker } from "../synckit/index.js";
import { createHttpRequest } from "./worker-http.js";
import { createHttpRequest, startHttpServer, stopHttpServer, setOutgoingResponse, clearOutgoingResponse } from "./worker-http.js";
import { Writable } from "node:stream";

import {
Expand All @@ -16,6 +16,8 @@ import {
FUTURE_GET_VALUE_AND_DISPOSE,
HTTP_CREATE_REQUEST,
HTTP_OUTPUT_STREAM_FINISH,
HTTP_SERVER_START,
HTTP_SERVER_STOP,
INPUT_STREAM_BLOCKING_READ,
INPUT_STREAM_BLOCKING_SKIP,
INPUT_STREAM_CREATE,
Expand Down Expand Up @@ -47,6 +49,8 @@ import {
STDERR,
STDIN,
STDOUT,
HTTP_SERVER_SET_OUTGOING_RESPONSE,
HTTP_SERVER_CLEAR_OUTGOING_RESPONSE,
} from "./calls.js";

let streamCnt = 0,
Expand Down Expand Up @@ -195,6 +199,14 @@ function handle(call, id, payload) {
stream.end();
break;
}
case HTTP_SERVER_START:
return startHttpServer(id, payload);
case HTTP_SERVER_STOP:
return stopHttpServer(id);
case HTTP_SERVER_SET_OUTGOING_RESPONSE:
return setOutgoingResponse(id, payload);
case HTTP_SERVER_CLEAR_OUTGOING_RESPONSE:
return clearOutgoingResponse(id);

// Sockets
case SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST:
Expand Down
Loading

0 comments on commit 82b7fa0

Please sign in to comment.