Skip to content

Commit

Permalink
fix: HTTP agent support, timeouts (#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
guybedford authored Dec 14, 2023
1 parent 995573b commit a13795a
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 40 deletions.
27 changes: 15 additions & 12 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ jobs:
name: Test
strategy:
matrix:
name: [
linux,
windows,
macos
os: [
ubuntu-latest,
windows-latest,
macos-latest
]
include:
- name: linux
os: ubuntu-latest
- name: windows
os: windows-latest
- name: macos
os: macos-latest
node: [18.x, 20.x, latest]
exclude:
- os: macos-latest
node: 20.x
- os: macos-latest
node: 18.x
- os: windows-latest
node: 20.x
- os: windows-latest
node: 18.x
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
Expand All @@ -53,7 +56,7 @@ jobs:
- uses: actions/setup-node@v3
with:
node-version: 'lts/*'
node-version: ${{ matrix.node }}
- name: Install NPM packages
run: npm install

Expand Down
68 changes: 46 additions & 22 deletions packages/preview2-shim/lib/io/worker-http.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { createStream, getStreamOrThrow } from "./worker-thread.js";
import { createServer, request as httpRequest } from "node:http";
import { request as httpsRequest } from "node:https";
import {
createServer,
request as httpRequest,
Agent as HttpAgent,
} from "node:http";
import { request as httpsRequest, Agent as HttpsAgent } from "node:https";
import { parentPort } from "node:worker_threads";
import { HTTP_SERVER_INCOMING_HANDLER } from "./calls.js";

const agentOptions = {
keepAlive: true,
};
const httpAgent = new HttpAgent(agentOptions);
const httpsAgent = new HttpsAgent(agentOptions);

const servers = new Map();

let responseCnt = 0;
Expand Down Expand Up @@ -59,7 +69,15 @@ export async function startHttpServer(id, { port, host }) {
servers.set(id, server);
}

export async function createHttpRequest(method, url, headers, bodyId) {
export async function createHttpRequest(
method,
url,
headers,
bodyId,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
) {
let stream = null;
if (bodyId) {
try {
Expand All @@ -84,47 +102,56 @@ export async function createHttpRequest(method, url, headers, bodyId) {
const parsedUrl = new URL(url);
let req;
switch (parsedUrl.protocol) {
case 'http:':
case "http:":
req = httpRequest({
agent: httpAgent,
method,
host: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname + parsedUrl.search,
headers
headers,
timeout: connectTimeout && Number(connectTimeout)
});
break;
case 'https:':
case "https:":
req = httpsRequest({
agent: httpsAgent,
method,
host: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname + parsedUrl.search,
headers
headers,
timeout: connectTimeout && Number(connectTimeout)
});
break;
default:
throw { tag: 'HTTP-protocol-error' };
throw { tag: "HTTP-protocol-error" };
}
if (stream) {
stream.pipe(req);
} else {
req.end();
}
const res = await new Promise((resolve, reject) => {
req.on('response', resolve);
req.on('close', () => reject);
req.on('error', reject);
req.on("response", resolve);
req.on("close", () => reject);
req.on("error", reject);
});
res.on('end', () => void res.emit("readable"));
if (firstByteTimeout)
res.setTimeout(Number(firstByteTimeout));
if (betweenBytesTimeout)
res.on("readable", () => {
res.setTimeout(Number(betweenBytesTimeout));
});
res.on("end", () => void res.emit("readable"));
const bodyStreamId = createStream(res);
return {
status: res.statusCode,
headers: Array.from(Object.entries(res.headers)),
bodyStreamId: bodyStreamId,
};
} catch (e) {
if (e?.tag)
throw e;
if (e?.tag) throw e;
const err = getFirstError(e);
switch (err.code) {
case "ECONNRESET":
Expand All @@ -149,12 +176,9 @@ export async function createHttpRequest(method, url, headers, bodyId) {
}
}

function getFirstError (e) {
if (typeof e !== 'object' || e === null)
return e;
if (e.cause)
return getFirstError(e.cause);
if (e instanceof AggregateError)
return getFirstError(e.errors[0]);
function getFirstError(e) {
if (typeof e !== "object" || e === null) return e;
if (e.cause) return getFirstError(e.cause);
if (e instanceof AggregateError) return getFirstError(e.errors[0]);
return e;
}
}
22 changes: 20 additions & 2 deletions packages/preview2-shim/lib/io/worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,26 @@ function handle(call, id, payload) {
switch (call) {
// Http
case HTTP_CREATE_REQUEST: {
const { method, url, headers, body } = payload;
return createFuture(createHttpRequest(method, url, headers, body));
const {
method,
url,
headers,
body,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout,
} = payload;
return createFuture(
createHttpRequest(
method,
url,
headers,
body,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
)
);
}
case OUTPUT_STREAM_CREATE | HTTP: {
const stream = new PassThrough();
Expand Down
16 changes: 12 additions & 4 deletions packages/preview2-shim/lib/nodejs/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ class OutgoingRequest {
return this.#headers;
}
[symbolDispose]() {}
static _handle(request, _options) {
// TODO: handle options timeouts
static _handle(request, options) {
const connectTimeout = options?.connectTimeoutMs();
const betweenBytesTimeout = options?.betweenBytesTimeoutMs();
const firstByteTimeout = options?.firstByteTimeoutMs();
const scheme = schemeString(request.#scheme);
const url = scheme + request.#authority + (request.#pathWithQuery || "");
const headers = [["host", request.#authority]];
Expand All @@ -285,7 +287,10 @@ class OutgoingRequest {
request.#method.val || request.#method.tag,
url,
headers,
outgoingBodyOutputStreamId(request.#body)
outgoingBodyOutputStreamId(request.#body),
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
);
}
}
Expand Down Expand Up @@ -412,13 +417,16 @@ class FutureIncomingResponse {
[symbolDispose]() {
if (this.#pollId) ioCall(FUTURE_DISPOSE | HTTP, this.#pollId);
}
static _create(method, url, headers, body) {
static _create(method, url, headers, body, connectTimeout, betweenBytesTimeout, firstByteTimeout) {
const res = new FutureIncomingResponse();
res.#pollId = ioCall(HTTP_CREATE_REQUEST, null, {
method,
url,
headers,
body,
connectTimeout,
betweenBytesTimeout,
firstByteTimeout
});
return res;
}
Expand Down

0 comments on commit a13795a

Please sign in to comment.