From a13795ab81f573fbfc731f51c61240da9a8c413c Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Thu, 14 Dec 2023 13:51:07 -0800 Subject: [PATCH] fix: HTTP agent support, timeouts (#313) --- .github/workflows/main.yml | 27 ++++---- packages/preview2-shim/lib/io/worker-http.js | 68 +++++++++++++------ .../preview2-shim/lib/io/worker-thread.js | 22 +++++- packages/preview2-shim/lib/nodejs/http.js | 16 +++-- 4 files changed, 93 insertions(+), 40 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a52039b71..0ff5eced4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -15,18 +15,21 @@ jobs: name: Test strategy: matrix: - name: [ - linux, - windows, - macos + os: [ + ubuntu-latest, + windows-latest, + macos-latest ] - include: - - name: linux - os: ubuntu-latest - - name: windows - os: windows-latest - - name: macos - os: macos-latest + node: [18.x, 20.x, latest] + exclude: + - os: macos-latest + node: 20.x + - os: macos-latest + node: 18.x + - os: windows-latest + node: 20.x + - os: windows-latest + node: 18.x runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -53,7 +56,7 @@ jobs: - uses: actions/setup-node@v3 with: - node-version: 'lts/*' + node-version: ${{ matrix.node }} - name: Install NPM packages run: npm install diff --git a/packages/preview2-shim/lib/io/worker-http.js b/packages/preview2-shim/lib/io/worker-http.js index 000f9b8d1..4453a268d 100644 --- a/packages/preview2-shim/lib/io/worker-http.js +++ b/packages/preview2-shim/lib/io/worker-http.js @@ -1,9 +1,19 @@ import { createStream, getStreamOrThrow } from "./worker-thread.js"; -import { createServer, request as httpRequest } from "node:http"; -import { request as httpsRequest } from "node:https"; +import { + createServer, + request as httpRequest, + Agent as HttpAgent, +} from "node:http"; +import { request as httpsRequest, Agent as HttpsAgent } from "node:https"; import { parentPort } from "node:worker_threads"; import { HTTP_SERVER_INCOMING_HANDLER } from "./calls.js"; +const agentOptions = { + keepAlive: true, +}; +const httpAgent = new HttpAgent(agentOptions); +const httpsAgent = new HttpsAgent(agentOptions); + const servers = new Map(); let responseCnt = 0; @@ -59,7 +69,15 @@ export async function startHttpServer(id, { port, host }) { servers.set(id, server); } -export async function createHttpRequest(method, url, headers, bodyId) { +export async function createHttpRequest( + method, + url, + headers, + bodyId, + connectTimeout, + betweenBytesTimeout, + firstByteTimeout +) { let stream = null; if (bodyId) { try { @@ -84,26 +102,30 @@ export async function createHttpRequest(method, url, headers, bodyId) { const parsedUrl = new URL(url); let req; switch (parsedUrl.protocol) { - case 'http:': + case "http:": req = httpRequest({ + agent: httpAgent, method, host: parsedUrl.hostname, port: parsedUrl.port, path: parsedUrl.pathname + parsedUrl.search, - headers + headers, + timeout: connectTimeout && Number(connectTimeout) }); break; - case 'https:': + case "https:": req = httpsRequest({ + agent: httpsAgent, method, host: parsedUrl.hostname, port: parsedUrl.port, path: parsedUrl.pathname + parsedUrl.search, - headers + headers, + timeout: connectTimeout && Number(connectTimeout) }); break; default: - throw { tag: 'HTTP-protocol-error' }; + throw { tag: "HTTP-protocol-error" }; } if (stream) { stream.pipe(req); @@ -111,11 +133,17 @@ export async function createHttpRequest(method, url, headers, bodyId) { req.end(); } const res = await new Promise((resolve, reject) => { - req.on('response', resolve); - req.on('close', () => reject); - req.on('error', reject); + req.on("response", resolve); + req.on("close", () => reject); + req.on("error", reject); }); - res.on('end', () => void res.emit("readable")); + if (firstByteTimeout) + res.setTimeout(Number(firstByteTimeout)); + if (betweenBytesTimeout) + res.on("readable", () => { + res.setTimeout(Number(betweenBytesTimeout)); + }); + res.on("end", () => void res.emit("readable")); const bodyStreamId = createStream(res); return { status: res.statusCode, @@ -123,8 +151,7 @@ export async function createHttpRequest(method, url, headers, bodyId) { bodyStreamId: bodyStreamId, }; } catch (e) { - if (e?.tag) - throw e; + if (e?.tag) throw e; const err = getFirstError(e); switch (err.code) { case "ECONNRESET": @@ -149,12 +176,9 @@ export async function createHttpRequest(method, url, headers, bodyId) { } } -function getFirstError (e) { - if (typeof e !== 'object' || e === null) - return e; - if (e.cause) - return getFirstError(e.cause); - if (e instanceof AggregateError) - return getFirstError(e.errors[0]); +function getFirstError(e) { + if (typeof e !== "object" || e === null) return e; + if (e.cause) return getFirstError(e.cause); + if (e instanceof AggregateError) return getFirstError(e.errors[0]); return e; -} \ No newline at end of file +} diff --git a/packages/preview2-shim/lib/io/worker-thread.js b/packages/preview2-shim/lib/io/worker-thread.js index 153e089b8..24cb3eca5 100644 --- a/packages/preview2-shim/lib/io/worker-thread.js +++ b/packages/preview2-shim/lib/io/worker-thread.js @@ -206,8 +206,26 @@ function handle(call, id, payload) { switch (call) { // Http case HTTP_CREATE_REQUEST: { - const { method, url, headers, body } = payload; - return createFuture(createHttpRequest(method, url, headers, body)); + const { + method, + url, + headers, + body, + connectTimeout, + betweenBytesTimeout, + firstByteTimeout, + } = payload; + return createFuture( + createHttpRequest( + method, + url, + headers, + body, + connectTimeout, + betweenBytesTimeout, + firstByteTimeout + ) + ); } case OUTPUT_STREAM_CREATE | HTTP: { const stream = new PassThrough(); diff --git a/packages/preview2-shim/lib/nodejs/http.js b/packages/preview2-shim/lib/nodejs/http.js index 1666380a6..e3233cec8 100644 --- a/packages/preview2-shim/lib/nodejs/http.js +++ b/packages/preview2-shim/lib/nodejs/http.js @@ -272,8 +272,10 @@ class OutgoingRequest { return this.#headers; } [symbolDispose]() {} - static _handle(request, _options) { - // TODO: handle options timeouts + static _handle(request, options) { + const connectTimeout = options?.connectTimeoutMs(); + const betweenBytesTimeout = options?.betweenBytesTimeoutMs(); + const firstByteTimeout = options?.firstByteTimeoutMs(); const scheme = schemeString(request.#scheme); const url = scheme + request.#authority + (request.#pathWithQuery || ""); const headers = [["host", request.#authority]]; @@ -285,7 +287,10 @@ class OutgoingRequest { request.#method.val || request.#method.tag, url, headers, - outgoingBodyOutputStreamId(request.#body) + outgoingBodyOutputStreamId(request.#body), + connectTimeout, + betweenBytesTimeout, + firstByteTimeout ); } } @@ -412,13 +417,16 @@ class FutureIncomingResponse { [symbolDispose]() { if (this.#pollId) ioCall(FUTURE_DISPOSE | HTTP, this.#pollId); } - static _create(method, url, headers, body) { + static _create(method, url, headers, body, connectTimeout, betweenBytesTimeout, firstByteTimeout) { const res = new FutureIncomingResponse(); res.#pollId = ioCall(HTTP_CREATE_REQUEST, null, { method, url, headers, body, + connectTimeout, + betweenBytesTimeout, + firstByteTimeout }); return res; }