-
Notifications
You must be signed in to change notification settings - Fork 343
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement Request.signal to detect client disconnects
- Loading branch information
Showing
9 changed files
with
201 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import { DurableObject, WorkerEntrypoint } from 'cloudflare:workers'; | ||
import assert from 'node:assert'; | ||
export class AbortTracker extends DurableObject { | ||
async getAborted(key) { | ||
return this.ctx.storage.get(key); | ||
} | ||
async setAborted(key, value) { | ||
this.ctx.storage.put(key, value); | ||
} | ||
} | ||
|
||
export class Server extends WorkerEntrypoint { | ||
async fetch(req) { | ||
const key = new URL(req.url).pathname.slice(1); | ||
let abortTracker = this.env.AbortTracker.get( | ||
this.env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
await abortTracker.setAborted(key, false); | ||
req.signal.onabort = () => { | ||
this.ctx.waitUntil(abortTracker.setAborted(key, true)); | ||
}; | ||
return this[key](); | ||
} | ||
|
||
async valid() { | ||
return new Response('hello world'); | ||
} | ||
|
||
async error() { | ||
throw new Error('boom'); | ||
} | ||
|
||
async hang() { | ||
for (;;) { | ||
await new Promise((r) => setTimeout(r, 86400)); | ||
} | ||
} | ||
|
||
async hangAfterSendingSomeData() { | ||
const { readable, writable } = new TransformStream(); | ||
this.ctx.waitUntil(this.sendSomeData(writable)); | ||
|
||
return new Response(readable); | ||
} | ||
|
||
async sendSomeData(writable) { | ||
const writer = writable.getWriter(); | ||
const enc = new TextEncoder(); | ||
await writer.write(enc.encode('hello world')); | ||
await this.hang(); | ||
} | ||
} | ||
|
||
export const noAbortOnSimpleResponse = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
const req = env.Server.fetch('http://example.com/valid'); | ||
|
||
const res = await req; | ||
assert.strictEqual(await res.text(), 'hello world'); | ||
assert.strictEqual(await abortTracker.getAborted('valid'), false); | ||
}, | ||
}; | ||
|
||
export const noAbortIfServerThrows = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
const req = env.Server.fetch('http://example.com/error'); | ||
|
||
await assert.rejects(() => req, { name: 'Error', message: 'boom' }); | ||
assert.strictEqual(await abortTracker.getAborted('error'), false); | ||
}, | ||
}; | ||
|
||
export const abortIfClientAbandonsRequest = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
// This endpoint never generates a response, so we can timeout after an arbitrary time. | ||
const req = env.Server.fetch('http://example.com/hang', { | ||
signal: AbortSignal.timeout(500), | ||
}); | ||
|
||
await assert.rejects(() => req, { | ||
name: 'TimeoutError', | ||
message: 'The operation was aborted due to timeout', | ||
}); | ||
assert.strictEqual(await abortTracker.getAborted('hang'), true); | ||
}, | ||
}; | ||
|
||
export const abortIfClientCancelsReadingResponse = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
// This endpoint begins generating a response but then hangs | ||
const req = env.Server.fetch('http://example.com/hangAfterSendingSomeData'); | ||
const res = await req; | ||
const reader = res.body.getReader(); | ||
|
||
const { value, done } = await reader.read(); | ||
assert.strictEqual(new TextDecoder().decode(value), 'hello world'); | ||
assert.ok(!done); | ||
|
||
// Give up reading | ||
await reader.cancel(); | ||
|
||
// Waste a bit of time so the server cleans up | ||
await new Promise((r) => setImmediate(r)); | ||
|
||
assert.strictEqual( | ||
await abortTracker.getAborted('hangAfterSendingSomeData'), | ||
true | ||
); | ||
}, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
using Workerd = import "/workerd/workerd.capnp"; | ||
|
||
const unitTests :Workerd.Config = ( | ||
services = [ | ||
( name = "request-client-disconnect", | ||
worker = ( | ||
modules = [ | ||
(name = "worker", esModule = embed "request-client-disconnect.js" ) | ||
], | ||
compatibilityDate = "2025-01-01", | ||
compatibilityFlags = ["nodejs_compat", "experimental"], | ||
durableObjectNamespaces = [ | ||
(className = "AbortTracker", uniqueKey = "badbeef"), | ||
], | ||
durableObjectStorage = (inMemory = void), | ||
bindings = [ | ||
(name = "AbortTracker", durableObjectNamespace = "AbortTracker"), | ||
(name = "Server", service = (name = "request-client-disconnect", entrypoint = "Server")), | ||
(name = "defaultExport", service = "request-client-disconnect"), | ||
] | ||
) | ||
) | ||
] | ||
); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters