-
Notifications
You must be signed in to change notification settings - Fork 343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Request.signal to detect client disconnects #3488
Open
npaun
wants to merge
2
commits into
main
Choose a base branch
from
npaun/request-signal
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+306
−16
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
import { DurableObject, WorkerEntrypoint } from 'cloudflare:workers'; | ||
import assert from 'node:assert'; | ||
import { scheduler } from 'node:timers/promises'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. VSCode just helpfully keeps importing it. I'll get rid of it before I merge. |
||
|
||
export class AbortTracker extends DurableObject { | ||
async getAborted(key) { | ||
return this.ctx.storage.get(key); | ||
} | ||
async setAborted(key, value) { | ||
await this.ctx.storage.put(key, value); | ||
} | ||
} | ||
export class OtherServer extends WorkerEntrypoint { | ||
async fetch() { | ||
await scheduler.wait(300); | ||
return new Response('completed'); | ||
} | ||
} | ||
|
||
export class Server extends WorkerEntrypoint { | ||
async fetch(req) { | ||
const key = new URL(req.url).pathname.slice(1); | ||
let abortTracker = this.env.AbortTracker.get( | ||
this.env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
await abortTracker.setAborted(key, false); | ||
|
||
req.signal.onabort = () => { | ||
this.ctx.waitUntil(abortTracker.setAborted(key, true)); | ||
}; | ||
|
||
return this[key](req); | ||
} | ||
|
||
async valid() { | ||
return new Response('hello world'); | ||
} | ||
|
||
async error() { | ||
throw new Error('boom'); | ||
} | ||
|
||
async hang() { | ||
for (;;) { | ||
await scheduler.wait(86400); | ||
} | ||
} | ||
|
||
async hangAfterSendingSomeData() { | ||
const { readable, writable } = new TransformStream(); | ||
this.ctx.waitUntil(this.sendSomeData(writable)); | ||
|
||
return new Response(readable); | ||
} | ||
|
||
async sendSomeData(writable) { | ||
const writer = writable.getWriter(); | ||
const enc = new TextEncoder(); | ||
await writer.write(enc.encode('hello world')); | ||
await this.hang(); | ||
} | ||
|
||
async triggerSubrequest(req) { | ||
this.ctx.waitUntil(this.callOtherServer(req)); | ||
await this.hang(); | ||
} | ||
|
||
async callOtherServer(req) { | ||
const key = 'subrequest'; | ||
|
||
let abortTracker = this.env.AbortTracker.get( | ||
this.env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
const passedThroughReq = new Request(req); | ||
passedThroughReq.onabort = () => { | ||
this.ctx.waitUntil(abortTracker.setAborted(key, true)); | ||
}; | ||
|
||
const res = await this.env.OtherServer.fetch(passedThroughReq); | ||
const text = await res.text(); | ||
|
||
if (text == 'completed') { | ||
await abortTracker.setAborted(key, false); | ||
} | ||
} | ||
} | ||
|
||
export const noAbortOnSimpleResponse = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
const req = env.Server.fetch('http://example.com/valid'); | ||
|
||
const res = await req; | ||
assert.strictEqual(await res.text(), 'hello world'); | ||
assert.strictEqual(await abortTracker.getAborted('valid'), false); | ||
}, | ||
}; | ||
|
||
export const noAbortIfServerThrows = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
const req = env.Server.fetch('http://example.com/error'); | ||
|
||
await assert.rejects(() => req, { name: 'Error', message: 'boom' }); | ||
assert.strictEqual(await abortTracker.getAborted('error'), false); | ||
}, | ||
}; | ||
|
||
export const abortIfClientAbandonsRequest = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
// This endpoint never generates a response, so we can timeout after an arbitrary time. | ||
const req = env.Server.fetch('http://example.com/hang', { | ||
signal: AbortSignal.timeout(500), | ||
}); | ||
|
||
await assert.rejects(() => req, { | ||
name: 'TimeoutError', | ||
message: 'The operation was aborted due to timeout', | ||
}); | ||
assert.strictEqual(await abortTracker.getAborted('hang'), true); | ||
}, | ||
}; | ||
|
||
export const abortIfClientCancelsReadingResponse = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
// This endpoint begins generating a response but then hangs | ||
const req = env.Server.fetch('http://example.com/hangAfterSendingSomeData'); | ||
const res = await req; | ||
const reader = res.body.getReader(); | ||
|
||
const { value, done } = await reader.read(); | ||
assert.strictEqual(new TextDecoder().decode(value), 'hello world'); | ||
assert.ok(!done); | ||
|
||
// Give up reading | ||
await reader.cancel(); | ||
|
||
// Waste a bit of time so the server cleans up | ||
await scheduler.wait(0); | ||
|
||
assert.strictEqual( | ||
await abortTracker.getAborted('hangAfterSendingSomeData'), | ||
true | ||
); | ||
}, | ||
}; | ||
|
||
export const abortedRequestDoesNotAbortSubrequest = { | ||
async test(ctrl, env, ctx) { | ||
let abortTracker = env.AbortTracker.get( | ||
env.AbortTracker.idFromName('AbortTracker') | ||
); | ||
|
||
// This endpoint calls another endpoint that eventually completes after wasting 300 ms | ||
// So, we abort the initial request quickly... | ||
const req = env.Server.fetch('http://example.com/triggerSubrequest', { | ||
signal: AbortSignal.timeout(100), | ||
}); | ||
|
||
await assert.rejects(() => req, { | ||
name: 'TimeoutError', | ||
message: 'The operation was aborted due to timeout', | ||
}); | ||
assert.strictEqual( | ||
await abortTracker.getAborted('triggerSubrequest'), | ||
true | ||
); | ||
|
||
// Then make sure that the subrequest wasn't also aborted | ||
await scheduler.wait(500); | ||
assert.strictEqual(await abortTracker.getAborted('subrequest'), false); | ||
}, | ||
}; |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm treating
req.clone()
andnew Request(req)
differently in that former always clones the associated signal, while the latter will respectignoreForSubrequests
and ignore the signal of the incoming fetch. Let me know if that doesn't sound right.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is not the solution I was expecting. I thought that the implementation of
fetch()
would be modified to ignore the signal if it came from the original top-level request. Why should cloning the request (whether via.clone()
or the constructor) drop the signal?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually if someone just does
fetch(originalRequest)
, the request is not cloned at all, so doesn't this not even solve that case?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does work because fetch calls the Request constructor again, but yeah this isn't the right way to accomplish this. I fixed it.