Skip to content

Commit

Permalink
PumpMessageLoop() at the end of request lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
ketanhwr committed Feb 26, 2025
1 parent 0d6cd71 commit bb34200
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 108 deletions.
1 change: 0 additions & 1 deletion samples/helloworld_esm/config.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ using Workerd = import "/workerd/workerd.capnp";
# instance of the workerd runtime. A single config file can contain multiple
# Workerd.Config definitions and must have at least one.
const helloWorldExample :Workerd.Config = (
v8Flags = [ "--expose-gc" ],

# Every workerd instance consists of a set of named services. A worker, for instance,
# is a type of service. Other types of services can include external servers, the
Expand Down
78 changes: 1 addition & 77 deletions samples/helloworld_esm/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,82 +4,6 @@

export default {
async fetch(req, env) {
let resp = await fetch("https://example.com");

// GC TEST
let cleanup0_call_count = 0;
let cleanup1_call_count = 0;

let cleanup0 = function (holdings) {
++cleanup0_call_count;
}

let cleanup1 = function (holdings) {
++cleanup1_call_count;
}

let fg0 = new FinalizationRegistry(cleanup0);
let fg1 = new FinalizationRegistry(cleanup1);

// Register 1 weak reference for each FinalizationRegistry and kill the
// objects they point to.
(function () {
// The objects need to be inside a closure so that we can reliably kill
// them.
let objects = [];
objects[0] = {};
objects[1] = {};
fg0.register(objects[0], "holdings0-0");
fg1.register(objects[1], "holdings1-0");
// Drop the references to the objects.
objects = [];
})();

// Schedule a GC, which will schedule both fg0 and fg1 for cleanup.
// Here and below, we need to invoke GC asynchronously and wait for it to
// finish, so that it doesn't need to scan the stack. Otherwise, the objects
// may not be reclaimed because of conservative stack scanning and the test
// may not work as intended.
let task_1_gc = (async function () {
gc({ type: 'major', execution: 'async' });

// Before the cleanup task has a chance to run, do the same thing again, so
// both FinalizationRegistries are (again) scheduled for cleanup. This has to
// be a IIFE function (so that we can reliably kill the objects) so we cannot
// use the same function as before.
(function () {
let objects = [];
objects[0] = {};
objects[1] = {};
fg0.register(objects[0], "holdings0-1");
fg1.register(objects[1], "holdings1-1");
objects = [];
})();
})();

// Schedule a second GC for execution after that, which will again schedule
// both fg0 and fg1 for cleanup.
let task_2_gc = (async function () {
gc({ type: 'major', execution: 'async' });

// Check that no cleanup task has had the chance to run yet.
console.log("Expected 0:" + cleanup0_call_count);
console.log("Expected 0:" + cleanup1_call_count);
})();

// Wait for the two GCs to be executed.
await task_1_gc;
await task_2_gc;

// Wait two ticks, so that the finalization registry cleanup has an
// opportunity to both run and re-post itself.
await new Promise(resolve=>setTimeout(resolve, 1000));
await new Promise(resolve=>setTimeout(resolve, 1000));

console.log("Expected 2:" + cleanup0_call_count);
console.log("Expected 2:" + cleanup1_call_count);
// GC TEST

return resp;
return new Response("Hello World\n");
}
};
1 change: 0 additions & 1 deletion src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ kj::Promise<DeferredProxy<void>> ServiceWorkerGlobalScope::request(kj::HttpMetho
return DeferredProxy<void>{promise.attach(kj::mv(adapter), kj::mv(client))};
}
} else KJ_IF_SOME(promise, event->getResponsePromise(lock)) {

auto body2 = kj::addRef(*ownRequestBody);

// HACK: If the client disconnects, the `response` reference is no longer valid. But our
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -692,4 +692,9 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
$experimental
$neededByFl;
# Enables cache settings specified request in fetch api cf object to override cache rules. (only for user owned or grey-clouded sites)

jsWeakRef @73 :Bool
$compatEnableFlag("enable_weak_ref")
$compatDisableFlag("disable_weak_ref");
# Enables WeakRefs and FinalizationRegistry API. WebAssembly based projects often rely on this API for wasm memory cleanup
}
87 changes: 69 additions & 18 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

#include "io-context.h"

#include "libplatform/libplatform.h"
#include "v8-platform.h" // NOLINT(build/include_directory)
#include "v8config.h" // NOLINT(build/include_directory)
#include <workerd/io/io-gate.h>
#include <workerd/io/worker.h>
#include <workerd/jsg/jsg.h>
Expand Down Expand Up @@ -451,6 +448,7 @@ kj::Promise<void> IoContext::IncomingRequest::drain() {
// For non-actor requests, apply the configured soft timeout, typically 30 seconds.
timeoutPromise = context->limitEnforcer->limitDrain();
}
context->pumpMessageLoop();
return context->waitUntilTasks.onEmpty()
.exclusiveJoin(kj::mv(timeoutPromise))
.exclusiveJoin(context->abortPromise.addBranch().then([] {}, [](kj::Exception&&) {}));
Expand All @@ -470,6 +468,7 @@ kj::Promise<IoContext_IncomingRequest::FinishScheduledResult> IoContext::Incomin
// Mark ourselves so we know that we made a best effort attempt to wait for waitUntilTasks.
KJ_ASSERT(context->incomingRequests.size() == 1);
context->incomingRequests.front().waitedForWaitUntil = true;
context->pumpMessageLoop();

auto timeoutPromise = context->limitEnforcer->limitScheduled().then(
[] { return IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT; });
Expand Down Expand Up @@ -1159,17 +1158,6 @@ void IoContext::runImpl(Runnable& runnable,
}
}
});

if (!isCurrentNull()) {
KJ_LOG(ERROR, "IoContext not-null before running PumpMessageLoop()");
} else {
worker->runInLockScope(lockType, [&](Worker::Lock& lock) {
jsg::Lock& js = lock;
auto& system = const_cast<jsg::V8System&>(js.getV8System());
KJ_DBG(js.v8Isolate);
while (v8::platform::PumpMessageLoop(&system.getDefaultPlatform(), js.v8Isolate, v8::platform::MessageLoopBehavior::kDoNotWait)) {}
});
}
}

static constexpr auto kAsyncIoErrorMessage =
Expand Down Expand Up @@ -1197,10 +1185,6 @@ bool IoContext::hasCurrent() {
return threadLocalRequest != nullptr;
}

bool IoContext::isCurrentNull() {
return threadLocalRequest == nullptr;
}

bool IoContext::isCurrent() {
return this == threadLocalRequest;
}
Expand Down Expand Up @@ -1396,6 +1380,73 @@ kj::Promise<void> IoContext::startDeleteQueueSignalTask(IoContext* context) {
}
}

void IoContext::pumpMessageLoop() {
kj::Promise<Worker::AsyncLock> asyncLockPromise = nullptr;
KJ_IF_SOME(a, actor) {
asyncLockPromise = worker->takeAsyncLockWhenActorCacheReady(now(), a, getMetrics());
} else {
asyncLockPromise = worker->takeAsyncLock(getMetrics());
}

addWaitUntil(asyncLockPromise.then([](Worker::AsyncLock lock) {
return lock;
}).then([this](Worker::AsyncLock lock) mutable {
KJ_REQUIRE(threadId == getThreadId(), "IoContext cannot switch threads");
IoContext* previousRequest = threadLocalRequest;
KJ_DEFER(threadLocalRequest = previousRequest);
threadLocalRequest = nullptr;

worker->runInLockScope(lock, [&](Worker::Lock& workerLock) {
workerLock.requireNoPermanentException();

auto limiterScope = limitEnforcer->enterJs(workerLock, *this);

bool gotTermination = false;

KJ_DEFER({
jsg::Lock& js = workerLock;
if (gotTermination) {
js.terminateExecution();
}
});

v8::TryCatch tryCatch(workerLock.getIsolate());
try {
jsg::Lock& js = workerLock;
js.pumpMessageLoop();
} catch (const jsg::JsExceptionThrown&) {
if (tryCatch.HasTerminated()) {
gotTermination = true;
limiterScope = nullptr;

limitEnforcer->requireLimitsNotExceeded();

if (!abortFulfiller->isWaiting()) {
KJ_FAIL_ASSERT("request terminated because it was aborted");
}

// That should have thrown, so we shouldn't get here.
KJ_FAIL_ASSERT("script terminated for unknown reasons");
} else {
if (tryCatch.Message().IsEmpty()) {
// Should never happen, but check for it because otherwise V8 will crash.
KJ_LOG(ERROR, "tryCatch.Message() was empty even when not HasTerminated()??",
kj::getStackTrace());
JSG_FAIL_REQUIRE(Error, "(JavaScript exception with no message)");
} else {
auto jsException = tryCatch.Exception();

workerLock.logUncaughtException(UncaughtExceptionSource::INTERNAL,
jsg::JsValue(jsException), jsg::JsMessage(tryCatch.Message()));

jsg::throwTunneledException(workerLock.getIsolate(), jsException);
}
}
}
});
}));
}

// ======================================================================================

WarningAggregator::WarningAggregator(IoContext& context, EmitCallback emitter)
Expand Down
5 changes: 2 additions & 3 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,6 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
// True if there is a current IoContext for the thread (current() will not throw).
static bool hasCurrent();

// True if there is no IoContext for this thread
static bool isCurrentNull();

// True if this is the IoContext for the current thread (same as `hasCurrent() && tcx == current()`).
bool isCurrent();

Expand Down Expand Up @@ -828,6 +825,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
return *getCurrentIncomingRequest().ioChannelFactory;
}

void pumpMessageLoop();

private:
ThreadContext& thread;

Expand Down
7 changes: 7 additions & 0 deletions src/workerd/jsg/jsg.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <workerd/jsg/util.h>
#include <workerd/util/thread-scopes.h>

#include "libplatform/libplatform.h"

namespace workerd::jsg {

kj::String stringifyHandle(v8::Local<v8::Value> value) {
Expand Down Expand Up @@ -270,6 +272,11 @@ void Lock::terminateExecution() {
v8Isolate->TerminateExecution();
}

void Lock::pumpMessageLoop() {
auto& system = const_cast<jsg::V8System&>(this->getV8System());
while (v8::platform::PumpMessageLoop(&system.getDefaultPlatform(), v8Isolate)) {}
}

Name Lock::newSymbol(kj::StringPtr symbol) {
return Name(*this, v8::Symbol::New(v8Isolate, v8StrIntern(v8Isolate, symbol)));
}
Expand Down
1 change: 1 addition & 0 deletions src/workerd/jsg/jsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -2678,6 +2678,7 @@ class Lock {

void runMicrotasks();
void terminateExecution();
void pumpMessageLoop();

// Logs and reports the error to tail workers (if called within an request),
// the inspector (if attached), or to KJ_LOG(Info).
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/jsg/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,7 @@ class ModuleRegistryBase {

struct NewContextOptions {
kj::Maybe<ModuleRegistryBase&> newModuleRegistry = kj::none;
bool enableWeakRef = false;
};

// TypeWrapper mixin for resource types (application-defined C++ classes declared with a
Expand Down Expand Up @@ -1428,10 +1429,10 @@ class ResourceWrapper {
// "skip callback and just allow".)
context->AllowCodeGenerationFromStrings(false);

// We do not allow use of WeakRef or FinalizationRegistry because they introduce
// non-deterministic behavior.
check(global->Delete(context, v8StrIntern(isolate, "WeakRef"_kj)));
//check(global->Delete(context, v8StrIntern(isolate, "FinalizationRegistry"_kj)));
if (!options.enableWeakRef) {
check(global->Delete(context, v8StrIntern(isolate, "WeakRef"_kj)));
check(global->Delete(context, v8StrIntern(isolate, "FinalizationRegistry"_kj)));
}

// Store a pointer to this object in slot 1, to be extracted in callbacks.
context->SetAlignedPointerInEmbedderData(1, ptr.get());
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/jsg/setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class V8System {
explicit V8System(v8::Platform& platform);

// Use a possibly-custom v8::Platform implementation, and apply flags.
explicit V8System(v8::Platform& platform, kj::ArrayPtr<const kj::StringPtr> flags, v8::Platform* defuaultPlatformPtr = nullptr);
explicit V8System(v8::Platform& platform, kj::ArrayPtr<const kj::StringPtr> flags, v8::Platform* defaultPlatformPtr = nullptr);

~V8System() noexcept(false);

Expand Down
2 changes: 0 additions & 2 deletions src/workerd/server/v8-platform-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ class WorkerdPlatform final: public v8::Platform {
return inner.GetTracingController();
}

auto& getInner() { return inner; }

private:
v8::Platform& inner;
};
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ struct WorkerdApi::Impl final {
auto version = getPythonBundleName(pythonRelease);
auto bundle = KJ_ASSERT_NONNULL(
fetchPyodideBundle(pythonConfig, version), "Failed to get Pyodide bundle");
auto context = lock.newContext<api::ServiceWorkerGlobalScope>(lock.v8Isolate);
jsg::NewContextOptions options{
.enableWeakRef = features->getJsWeakRef()
};
auto context = lock.newContext<api::ServiceWorkerGlobalScope>(options, lock.v8Isolate);
v8::Context::Scope scope(context.getHandle(lock));
// Init emscripten synchronously, the python script will import setup-emscripten and
// call setEmscriptenModele
Expand Down Expand Up @@ -343,6 +346,7 @@ CompatibilityFlags::Reader WorkerdApi::getFeatureFlags() const {
jsg::JsContext<api::ServiceWorkerGlobalScope> WorkerdApi::newContext(jsg::Lock& lock) const {
jsg::NewContextOptions options{
.newModuleRegistry = impl->tryGetModuleRegistry(),
.enableWeakRef = getFeatureFlags().getJsWeakRef()
};
return kj::downcast<JsgWorkerdIsolate::Lock>(lock).newContext<api::ServiceWorkerGlobalScope>(
kj::mv(options), lock.v8Isolate);
Expand Down

0 comments on commit bb34200

Please sign in to comment.