Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable FinalizationRegistry #3560

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -692,4 +692,9 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
$experimental
$neededByFl;
# Enables cache settings specified request in fetch api cf object to override cache rules. (only for user owned or grey-clouded sites)

jsWeakRef @73 :Bool
$compatEnableFlag("enable_weak_ref")
$compatDisableFlag("disable_weak_ref");
# Enables WeakRefs and FinalizationRegistry API. WebAssembly based projects often rely on this API for wasm memory cleanup
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a compatibility date as well (I think we should?)? How exactly do we decide what goes behind a date vs flag vs both?

}
70 changes: 70 additions & 0 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <workerd/jsg/jsg.h>
#include <workerd/util/sentry.h>
#include <workerd/util/uncaught-exception-source.h>
#include <workerd/jsg/setup.h>

#include <kj/debug.h>

Expand Down Expand Up @@ -447,6 +448,7 @@ kj::Promise<void> IoContext::IncomingRequest::drain() {
// For non-actor requests, apply the configured soft timeout, typically 30 seconds.
timeoutPromise = context->limitEnforcer->limitDrain();
}
context->pumpMessageLoop();
return context->waitUntilTasks.onEmpty()
.exclusiveJoin(kj::mv(timeoutPromise))
.exclusiveJoin(context->abortPromise.addBranch().then([] {}, [](kj::Exception&&) {}));
Expand All @@ -466,6 +468,7 @@ kj::Promise<IoContext_IncomingRequest::FinishScheduledResult> IoContext::Incomin
// Mark ourselves so we know that we made a best effort attempt to wait for waitUntilTasks.
KJ_ASSERT(context->incomingRequests.size() == 1);
context->incomingRequests.front().waitedForWaitUntil = true;
context->pumpMessageLoop();

auto timeoutPromise = context->limitEnforcer->limitScheduled().then(
[] { return IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT; });
Expand Down Expand Up @@ -1377,6 +1380,73 @@ kj::Promise<void> IoContext::startDeleteQueueSignalTask(IoContext* context) {
}
}

void IoContext::pumpMessageLoop() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a some duplication here with IoContext::runImpl(), but ensuring there's no IoContext while reusing those functions would require specializing run(), runImpl() as well as runInContextScope() to pass through a flag indicating a null IoContext. runInContextScope() also runs the code using JSG_WITHIN_CONTEXT_SCOPE which I don't think is necessarily required to pump the message loop.

kj::Promise<Worker::AsyncLock> asyncLockPromise = nullptr;
KJ_IF_SOME(a, actor) {
asyncLockPromise = worker->takeAsyncLockWhenActorCacheReady(now(), a, getMetrics());
} else {
asyncLockPromise = worker->takeAsyncLock(getMetrics());
}

addWaitUntil(asyncLockPromise.then([](Worker::AsyncLock lock) {
return lock;
}).then([this](Worker::AsyncLock lock) mutable {
KJ_REQUIRE(threadId == getThreadId(), "IoContext cannot switch threads");
IoContext* previousRequest = threadLocalRequest;
KJ_DEFER(threadLocalRequest = previousRequest);
threadLocalRequest = nullptr;

worker->runInLockScope(lock, [&](Worker::Lock& workerLock) {
workerLock.requireNoPermanentException();

auto limiterScope = limitEnforcer->enterJs(workerLock, *this);

bool gotTermination = false;

KJ_DEFER({
jsg::Lock& js = workerLock;
if (gotTermination) {
js.terminateExecution();
}
});

v8::TryCatch tryCatch(workerLock.getIsolate());
try {
jsg::Lock& js = workerLock;
js.pumpMessageLoop();
} catch (const jsg::JsExceptionThrown&) {
if (tryCatch.HasTerminated()) {
gotTermination = true;
limiterScope = nullptr;

limitEnforcer->requireLimitsNotExceeded();

if (!abortFulfiller->isWaiting()) {
KJ_FAIL_ASSERT("request terminated because it was aborted");
}

// That should have thrown, so we shouldn't get here.
KJ_FAIL_ASSERT("script terminated for unknown reasons");
} else {
if (tryCatch.Message().IsEmpty()) {
// Should never happen, but check for it because otherwise V8 will crash.
KJ_LOG(ERROR, "tryCatch.Message() was empty even when not HasTerminated()??",
kj::getStackTrace());
JSG_FAIL_REQUIRE(Error, "(JavaScript exception with no message)");
} else {
auto jsException = tryCatch.Exception();

workerLock.logUncaughtException(UncaughtExceptionSource::INTERNAL,
jsg::JsValue(jsException), jsg::JsMessage(tryCatch.Message()));

jsg::throwTunneledException(workerLock.getIsolate(), jsException);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied over from runImpl() but I don't think it would be needed here since it runs after the request handler has run.. (but I don't fully understand the implications here)

}
}
}
});
}));
}

// ======================================================================================

WarningAggregator::WarningAggregator(IoContext& context, EmitCallback emitter)
Expand Down
2 changes: 2 additions & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
return *getCurrentIncomingRequest().ioChannelFactory;
}

void pumpMessageLoop();

private:
ThreadContext& thread;

Expand Down
7 changes: 7 additions & 0 deletions src/workerd/jsg/jsg.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <workerd/jsg/util.h>
#include <workerd/util/thread-scopes.h>

#include "libplatform/libplatform.h"

namespace workerd::jsg {

kj::String stringifyHandle(v8::Local<v8::Value> value) {
Expand Down Expand Up @@ -270,6 +272,11 @@ void Lock::terminateExecution() {
v8Isolate->TerminateExecution();
}

void Lock::pumpMessageLoop() {
auto& system = const_cast<jsg::V8System&>(this->getV8System());
while (v8::platform::PumpMessageLoop(&system.getDefaultPlatform(), v8Isolate)) {}
}

Name Lock::newSymbol(kj::StringPtr symbol) {
return Name(*this, v8::Symbol::New(v8Isolate, v8StrIntern(v8Isolate, symbol)));
}
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/jsg/jsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,8 @@ class ExternalMemoryAdjustment final {
// Isolate<TypeWrapper>::Lock. Usually this is only done in top-level code, and the Lock is
// passed down to everyone else from there. See setup.h for details.

class V8System;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be exposing V8System like this. The intent of JSG is to abstract V8 specifics away as much as possible and this just exposes them more. If we have to expose something here, which I'm unsure about, then the functionality should be folded into a new jsg::Lock method... like js.pumpMessageLoop() so that the details of interfacing with the v8 APIS do not need to leak more out to other areas.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. js.pumpMessageLoop() was the intended plan before I sent this out for review 😅


class Lock {
public:
// The underlying V8 isolate, useful for directly calling V8 APIs. Hopefully, this is rarely
Expand Down Expand Up @@ -2674,11 +2676,15 @@ class Lock {

void runMicrotasks();
void terminateExecution();
void pumpMessageLoop();

// Logs and reports the error to tail workers (if called within an request),
// the inspector (if attached), or to KJ_LOG(Info).
virtual void reportError(const JsValue& value) = 0;

protected:
virtual const V8System& getV8System() = 0;

private:
// Mark the jsg::Lock as being disallowed from being passed as a parameter into
// a kj promise coroutine. Note that this only blocks directly passing the Lock
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/jsg/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,7 @@ class ModuleRegistryBase {

struct NewContextOptions {
kj::Maybe<ModuleRegistryBase&> newModuleRegistry = kj::none;
bool enableWeakRef = false;
};

// TypeWrapper mixin for resource types (application-defined C++ classes declared with a
Expand Down Expand Up @@ -1428,10 +1429,10 @@ class ResourceWrapper {
// "skip callback and just allow".)
context->AllowCodeGenerationFromStrings(false);

// We do not allow use of WeakRef or FinalizationRegistry because they introduce
// non-deterministic behavior.
check(global->Delete(context, v8StrIntern(isolate, "WeakRef"_kj)));
check(global->Delete(context, v8StrIntern(isolate, "FinalizationRegistry"_kj)));
if (!options.enableWeakRef) {
check(global->Delete(context, v8StrIntern(isolate, "WeakRef"_kj)));
check(global->Delete(context, v8StrIntern(isolate, "FinalizationRegistry"_kj)));
}

// Store a pointer to this object in slot 1, to be extracted in callbacks.
context->SetAlignedPointerInEmbedderData(1, ptr.get());
Expand Down
11 changes: 7 additions & 4 deletions src/workerd/jsg/setup.c++
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ static kj::Own<v8::Platform> userPlatform(v8::Platform& platform) {
V8System::V8System(): V8System(defaultPlatform(0), nullptr) {}
V8System::V8System(kj::ArrayPtr<const kj::StringPtr> flags): V8System(defaultPlatform(0), flags) {}
V8System::V8System(v8::Platform& platformParam): V8System(platformParam, nullptr) {}
V8System::V8System(v8::Platform& platformParam, kj::ArrayPtr<const kj::StringPtr> flags)
: V8System(userPlatform(platformParam), flags) {}
V8System::V8System(kj::Own<v8::Platform> platformParam, kj::ArrayPtr<const kj::StringPtr> flags)
V8System::V8System(v8::Platform& platformParam, kj::ArrayPtr<const kj::StringPtr> flags, v8::Platform* defaultPlatformPtr)
: V8System(userPlatform(platformParam), flags, defaultPlatformPtr) {}
V8System::V8System(kj::Own<v8::Platform> platformParam, kj::ArrayPtr<const kj::StringPtr> flags, v8::Platform* defaultPlatformPtr)
: platformInner(kj::mv(platformParam)),
platformWrapper(*platformInner) {
platformWrapper(*platformInner), defaultPlatformPtr_{defaultPlatformPtr} {
if (!defaultPlatformPtr_) {
defaultPlatformPtr_ = platformInner.get();
}
#if V8_HAS_STACK_START_MARKER
v8::StackStartMarker::EnableForProcess();
#endif
Expand Down
15 changes: 13 additions & 2 deletions src/workerd/jsg/setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,22 @@ class V8System {
explicit V8System(v8::Platform& platform);

// Use a possibly-custom v8::Platform implementation, and apply flags.
explicit V8System(v8::Platform& platform, kj::ArrayPtr<const kj::StringPtr> flags);
explicit V8System(v8::Platform& platform, kj::ArrayPtr<const kj::StringPtr> flags, v8::Platform* defaultPlatformPtr = nullptr);

~V8System() noexcept(false);

typedef void FatalErrorCallback(kj::StringPtr location, kj::StringPtr message);
static void setFatalErrorCallback(FatalErrorCallback* callback);

auto& getDefaultPlatform() { return *defaultPlatformPtr_; }

private:
kj::Own<v8::Platform> platformInner;
V8PlatformWrapper platformWrapper;
friend class IsolateBase;
v8::Platform* defaultPlatformPtr_;

explicit V8System(kj::Own<v8::Platform>, kj::ArrayPtr<const kj::StringPtr>);
explicit V8System(kj::Own<v8::Platform>, kj::ArrayPtr<const kj::StringPtr>, v8::Platform* defaultPlatformPtr = nullptr);
};

// Base class of Isolate<T> containing parts that don't need to be templated, to avoid code
Expand All @@ -73,6 +76,10 @@ class IsolateBase {
public:
static IsolateBase& from(v8::Isolate* isolate);

auto& getV8System() {
return system;
}

// Unwraps a JavaScript exception as a kj::Exception.
virtual kj::Exception unwrapException(
v8::Local<v8::Context> context, v8::Local<v8::Value> exception) = 0;
Expand Down Expand Up @@ -673,6 +680,10 @@ class Isolate: public IsolateBase {
*this, value.toString(*this), value, JsMessage::create(*this, value));
}
}
protected:
virtual const V8System& getV8System() override {
return jsgIsolate.getV8System();
}

private:
Isolate& jsgIsolate;
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ struct WorkerdApi::Impl final {
auto version = getPythonBundleName(pythonRelease);
auto bundle = KJ_ASSERT_NONNULL(
fetchPyodideBundle(pythonConfig, version), "Failed to get Pyodide bundle");
auto context = lock.newContext<api::ServiceWorkerGlobalScope>(lock.v8Isolate);
jsg::NewContextOptions options{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable this for python workers unconditionally or behind a flag as well? I think pyodide already has its own dummy FinalizationRegistry implementation
cc: @hoodmane

.enableWeakRef = features->getJsWeakRef()
};
auto context = lock.newContext<api::ServiceWorkerGlobalScope>(options, lock.v8Isolate);
v8::Context::Scope scope(context.getHandle(lock));
// Init emscripten synchronously, the python script will import setup-emscripten and
// call setEmscriptenModele
Expand Down Expand Up @@ -343,6 +346,7 @@ CompatibilityFlags::Reader WorkerdApi::getFeatureFlags() const {
jsg::JsContext<api::ServiceWorkerGlobalScope> WorkerdApi::newContext(jsg::Lock& lock) const {
jsg::NewContextOptions options{
.newModuleRegistry = impl->tryGetModuleRegistry(),
.enableWeakRef = getFeatureFlags().getJsWeakRef()
};
return kj::downcast<JsgWorkerdIsolate::Lock>(lock).newContext<api::ServiceWorkerGlobalScope>(
kj::mv(options), lock.v8Isolate);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/workerd.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ class CliMain final: public SchemaFileImpl::ErrorReporter {
auto platform = jsg::defaultPlatform(0);
WorkerdPlatform v8Platform(*platform);
jsg::V8System v8System(
v8Platform, KJ_MAP(flag, config.getV8Flags()) -> kj::StringPtr { return flag; });
v8Platform, KJ_MAP(flag, config.getV8Flags()) -> kj::StringPtr { return flag; }, platform.get());
auto promise = func(v8System, config);
KJ_IF_SOME(w, watcher) {
promise = promise.exclusiveJoin(waitForChanges(w).then([this]() {
Expand Down