diff --git a/.github/workflows/generate-alpha-tag.yaml b/.github/workflows/generate-alpha-tag.yaml index fe9fca8..7cab5ba 100644 --- a/.github/workflows/generate-alpha-tag.yaml +++ b/.github/workflows/generate-alpha-tag.yaml @@ -16,7 +16,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4 + uses: actions/checkout@v4 with: fetch-depth: 0 @@ -42,7 +42,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4 + uses: actions/checkout@v4 with: fetch-depth: 0 token: ${{ secrets.TOKEN_GITHUB_ACTION }} diff --git a/.github/workflows/generate-tag.yaml b/.github/workflows/generate-tag.yaml index c60f7ca..1e53022 100644 --- a/.github/workflows/generate-tag.yaml +++ b/.github/workflows/generate-tag.yaml @@ -34,7 +34,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4 + uses: actions/checkout@v4 with: fetch-depth: 0 @@ -60,7 +60,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4 + uses: actions/checkout@v4 with: fetch-depth: 0 token: ${{ secrets.TOKEN_GITHUB_ACTION }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index d7e5186..6cb6af4 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -24,7 +24,7 @@ jobs: timeout-minutes: 150 runs-on: macos-latest steps: - - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4 + - uses: actions/checkout@v4 with: fetch-depth: 0 token: ${{ secrets.TOKEN_GITHUB_ACTION }} diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 2a625e6..e125dd6 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -15,7 +15,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4 + uses: actions/checkout@v4 with: fetch-depth: 0 @@ -44,7 +44,7 @@ jobs: timeout-minutes: 60 steps: - - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4 + - uses: actions/checkout@v4 with: fetch-depth: 0 diff --git a/.gitignore b/.gitignore index faf7375..e8eec4f 100644 --- a/.gitignore +++ b/.gitignore @@ -143,3 +143,5 @@ _site/ .sass-cache/ .jekyll-cache/ .jekyll-metadata + +.kotlin \ No newline at end of file diff --git a/README.MD b/README.MD index 76039a9..1ece113 100644 --- a/README.MD +++ b/README.MD @@ -10,9 +10,10 @@ dependencies { ## Rationale -When building applications that require graceful shutdown it typically requires us to write a lot of platform-specific -code. -This library aims to solve that problem by leveraging Kotlin MPP using KotlinX Coroutines, and Structured Concurrency. +When writing software we need to deal with the lifecycle of the application such as termination signals, and sending +correct exit codes. This is important, so we correctly interact with the OS our application runs on. +This requires a lot of platform-specific code, SuspendApp solves that problem by leveraging Kotlin MPP using KotlinX Coroutines, and Structured Concurrency. +See [Simple Example below](#simple-example). Currently supported targets: @@ -118,16 +119,18 @@ shutdown. More information on this can be found in this blog by [Phil Pearl](https://philpearl.github.io/post/k8s_ingress/), and on [learnk8s.io](https://learnk8s.io/graceful-shutdown). -The module `suspendapp-ktor` provides a `server` constructor that lifts the Ktor `ApplicationEngine` in to a `Resource`, +The module `suspendapp-ktor` provides a `server` constructor that lifts the Ktor `ApplicationEngine` in to a `Resource`, representing the _Engine_ running an `Application`(i.e `Netty`) while supporting auto-reload. Check the official Ktor documentation to learn more about [watchPaths](https://ktor.io/docs/auto-reload.html). -When our `release` function of our `ApplicationEngine` is called, there is a `wait` period before the beginning of the stop -process (defaulted to `30.seconds`), this gives Kubernetes enough time to do all its network management before we shut down. -Two more parameters are available: `grace` which set the number of seconds during which already inflight requests are -allowed to continue before the shutdown process begins, and `timeout` which set the number of seconds after which the +When our `release` function of our `ApplicationEngine` is called, there is a `wait` period before the beginning of the +stop +process (defaulted to `30.seconds`), this gives Kubernetes enough time to do all its network management before we shut +down. +Two more parameters are available: `grace` which set the number of seconds during which already inflight requests are +allowed to continue before the shutdown process begins, and `timeout` which set the number of seconds after which the server will be forceably shutdown. In the case that `ktor` server is set in -[development mode](https://ktor.io/docs/development-mode.html), the `wait` period is ignored. +[development mode](https://ktor.io/docs/development-mode.html), the `wait` period is ignored. Given this `Resource` definition of a Ktor server, with support for gracefully shutting down for K8S we can define a `SuspendApp`. @@ -218,7 +221,8 @@ You can run your NodeJS app with the following `node` command, and if you press `ctrl+c` within the first 2500ms you will see the following output. ```text -node build/js/packages/YourAppName/kotlin/YourAppName.js +./gradlew compileProductionExecutableKotlinJs +node example/build/compileSync/js/main/productionExecutable/kotlin/suspendapp-example.js App Started! Waiting until asked to shutdown. ^CCleaning up App... will take 10 seconds... @@ -248,7 +252,7 @@ You can run your Native app with the following command, and if you press `ctrl+c` within the first 2500ms you will see the following output. ```text -./gradlew build +./gradlew linkReleaseExecutableMacosArm64 build/bin/native/releaseExecutable/YourAppName.kexe App Started! Waiting until asked to shutdown. diff --git a/api/suspendapp.api b/api/suspendapp.api index f403213..715009f 100644 --- a/api/suspendapp.api +++ b/api/suspendapp.api @@ -1,11 +1,29 @@ -public final class arrow/continuations/SuspendAppKt { - public static final fun SuspendApp-8Mi8wO0 (Lkotlin/coroutines/CoroutineContext;JLkotlin/jvm/functions/Function2;)V - public static synthetic fun SuspendApp-8Mi8wO0$default (Lkotlin/coroutines/CoroutineContext;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V +public final class arrow/continuations/Enviroment_jvmKt { + public static final fun process ()Larrow/continuations/Process; +} + +public final class arrow/continuations/JvmProcess : arrow/continuations/Process { + public static final field INSTANCE Larrow/continuations/JvmProcess; + public fun close ()V + public fun exit (I)Ljava/lang/Void; + public synthetic fun exit (I)V + public fun onShutdown (Lkotlin/jvm/functions/Function1;)Lkotlin/jvm/functions/Function1; + public fun onSigInt (Lkotlin/jvm/functions/Function2;)V + public fun onSigTerm (Lkotlin/jvm/functions/Function2;)V + public fun runScope (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)V } -public final class arrow/continuations/unsafe/Unsafe { - public static final field INSTANCE Larrow/continuations/unsafe/Unsafe; - public final fun onShutdown (Lkotlin/jvm/functions/Function1;)Lkotlin/jvm/functions/Function0; - public final fun runCoroutineScope (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)V +public abstract interface class arrow/continuations/Process : java/lang/AutoCloseable { + public abstract fun close ()V + public abstract fun exit (I)V + public abstract fun onShutdown (Lkotlin/jvm/functions/Function1;)Lkotlin/jvm/functions/Function1; + public abstract fun onSigInt (Lkotlin/jvm/functions/Function2;)V + public abstract fun onSigTerm (Lkotlin/jvm/functions/Function2;)V + public abstract fun runScope (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)V +} + +public final class arrow/continuations/SuspendAppKt { + public static final fun SuspendApp-1Y68eR8 (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;JLarrow/continuations/Process;Lkotlin/jvm/functions/Function2;)V + public static synthetic fun SuspendApp-1Y68eR8$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;JLarrow/continuations/Process;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)V } diff --git a/build.gradle.kts b/build.gradle.kts index 9cd0345..19d6819 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,7 +1,6 @@ import org.jetbrains.dokka.gradle.DokkaTask import org.jetbrains.kotlin.gradle.tasks.KotlinCompile -@Suppress("DSL_SCOPE_VIOLATION") plugins { alias(libs.plugins.kotlin.multiplatform) alias(libs.plugins.dokka) @@ -32,7 +31,8 @@ kotlin { js(IR) { nodejs() } - + + linuxArm64() linuxX64() mingwX64() macosArm64() @@ -51,13 +51,15 @@ kotlin { val linuxX64Main by getting val macosArm64Main by getting val macosX64Main by getting - + val linuxArm64Main by getting + create("nativeMain") { dependsOn(commonMain) - mingwX64Main.dependsOn(this) linuxX64Main.dependsOn(this) macosArm64Main.dependsOn(this) macosX64Main.dependsOn(this) + mingwX64Main.dependsOn(this) + linuxArm64Main.dependsOn(this) } } } @@ -73,6 +75,7 @@ tasks { matchingRegex.set(".*\\.unsafe.*") suppress.set(true) } + externalDocumentationLink("https://kotlinlang.org/api/kotlinx.coroutines/") sourceLink { localDirectory.set(file("src/commonMain/kotlin")) remoteUrl.set(uri("https://github.com/arrow-kt/suspendapp/tree/main/src/commonMain/kotlin").toURL()) @@ -81,7 +84,7 @@ tasks { } } } - + withType().configureEach { kotlinOptions.jvmTarget = "1.8" } diff --git a/example/build.gradle.kts b/example/build.gradle.kts index 5215599..31b8410 100644 --- a/example/build.gradle.kts +++ b/example/build.gradle.kts @@ -1,33 +1,38 @@ +import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi +import org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.RELEASE + plugins { kotlin("multiplatform") } repositories { - mavenCentral() + mavenCentral() } kotlin { - // TODO fix setup for Main-Class - // jvm() - js(IR) { - nodejs { - binaries.executable() + jvm { + @OptIn(ExperimentalKotlinGradlePluginApi::class) + mainRun { + mainClass.set("io.arrow.suspendapp.example.MaintKt") } + withJava() } - - linuxX64 { - binaries.executable() - } - mingwX64 { - binaries.executable() - } - macosArm64 { + js(IR) { + nodejs() binaries.executable() } - macosX64 { - binaries.executable() + + listOf( + linuxX64(), + mingwX64(), + macosArm64(), + macosX64() + ).forEach { target -> + target.binaries.executable(listOf(RELEASE)) { + entryPoint = "io.arrow.suspendapp.example.main" + } } - + sourceSets { val commonMain by getting { dependencies { @@ -35,20 +40,12 @@ kotlin { implementation("io.arrow-kt:arrow-fx-coroutines:1.2.0") } } - - // val jvmMain by getting + + val jvmMain by getting val jsMain by getting val mingwX64Main by getting val linuxX64Main by getting val macosArm64Main by getting val macosX64Main by getting - - create("nativeMain") { - dependsOn(commonMain) - mingwX64Main.dependsOn(this) - linuxX64Main.dependsOn(this) - macosArm64Main.dependsOn(this) - macosX64Main.dependsOn(this) - } } } diff --git a/example/src/commonMain/kotlin/Main.kt b/example/src/commonMain/kotlin/io/arrow/suspendapp/example/Main.kt similarity index 93% rename from example/src/commonMain/kotlin/Main.kt rename to example/src/commonMain/kotlin/io/arrow/suspendapp/example/Main.kt index 9f950f9..8d38c68 100644 --- a/example/src/commonMain/kotlin/Main.kt +++ b/example/src/commonMain/kotlin/io/arrow/suspendapp/example/Main.kt @@ -1,3 +1,5 @@ +package io.arrow.suspendapp.example + import arrow.continuations.SuspendApp import arrow.fx.coroutines.resource import kotlinx.coroutines.awaitCancellation diff --git a/gradle.properties b/gradle.properties index 59064ee..51fb9bf 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,5 @@ -# Package definitions projects.group=io.arrow-kt -# Pomfile definitions pom.name=suspendapp pom.description=Reason about resource-safety in the same way you reason about Structured Concurrency with SuspendApp! pom.url=https://github.com/arrow-kt/suspendapp/ @@ -13,17 +11,8 @@ pom.smc.url=https://github.com/arrow-kt/arrow/ pom.smc.connection=scm:git:git://github.com/arrow-kt/arrow.git pom.smc.developerConnection=scm:git:ssh://git@github.com/arrow-kt/arrow.git -# Gradle options org.gradle.jvmargs=-Xmx4g org.gradle.parallel=true -# To disable publishing of sha-512 checksums for maven-metadata.xml files -systemProp.org.gradle.internal.publish.checksums.insecure=true - -# Kotlin configuration kotlin.incremental=true kotlin.code.style=official -kotlin.mpp.enableCompatibilityMetadataVariant=true -kotlin.native.enableDependencyPropagation=false -kotlin.js.generate.executable.default=false -kotlin.native.binary.memoryModel=experimental diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 994c36b..558b30e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,6 +1,6 @@ [versions] -kotlin = "1.9.10" -coroutines = "1.7.2" +kotlin = "1.9.24" +coroutines = "1.8.0" dokka = "1.9.20" arrowGradleConfig = "0.12.0-rc.6" kotlinBinaryCompatibilityValidator = "0.14.0" diff --git a/src/commonMain/kotlin/arrow/continuations/Process.kt b/src/commonMain/kotlin/arrow/continuations/Process.kt new file mode 100644 index 0000000..242b072 --- /dev/null +++ b/src/commonMain/kotlin/arrow/continuations/Process.kt @@ -0,0 +1,35 @@ +package arrow.continuations + +import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.CoroutineScope + +/** KMP constructor for [Process]. */ +expect fun process(): Process + +/** + * [Process] offers a common API to work with our application's process, installing signal handlers, + * shutdown hooks, running scopes in our process (runBlocking), and exiting the process. + */ +@OptIn(ExperimentalStdlibApi::class) +interface Process : AutoCloseable { + fun onSigTerm(block: suspend (code: Int) -> Unit): Unit + + fun onSigInt(block: suspend (code: Int) -> Unit): Unit + + fun onShutdown(block: suspend () -> Unit): suspend () -> Unit + + /** + * On JVM, and Native this will use kotlinx.coroutines.runBlocking, On NodeJS we need an infinite + * heartbeat to keep main alive. The heartbeat is fast enough that it isn't silently discarded, as + * longer ticks are, but slow enough that we don't interrupt often. + * https://stackoverflow.com/questions/23622051/how-to-forcibly-keep-a-node-js-process-from-terminating + */ + fun runScope( + context: CoroutineContext, + block: suspend CoroutineScope.() -> Unit, + ) + + fun exit(code: Int): Unit + + override fun close(): Unit +} diff --git a/src/commonMain/kotlin/arrow/continuations/SuspendApp.kt b/src/commonMain/kotlin/arrow/continuations/SuspendApp.kt index 4020c9b..3299520 100644 --- a/src/commonMain/kotlin/arrow/continuations/SuspendApp.kt +++ b/src/commonMain/kotlin/arrow/continuations/SuspendApp.kt @@ -1,16 +1,9 @@ package arrow.continuations -import arrow.continuations.unsafe.Unsafe import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.time.Duration -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.launch -import kotlinx.coroutines.withTimeout +import kotlinx.coroutines.* /** * An unsafe blocking edge that wires the [CoroutineScope] (and structured concurrency) to the @@ -26,15 +19,38 @@ import kotlinx.coroutines.withTimeout * regardless of finalizers. * @param block the lambda of the actual application. */ +@OptIn(ExperimentalStdlibApi::class) fun SuspendApp( context: CoroutineContext = Dispatchers.Default, + uncaught: (Throwable) -> Unit = Throwable::printStackTrace, timeout: Duration = Duration.INFINITE, + process: Process = process(), block: suspend CoroutineScope.() -> Unit, ): Unit = - Unsafe.runCoroutineScope(context) { - val job: Job = launch(context = context, start = CoroutineStart.LAZY) { block() } - val unregister: () -> Unit = Unsafe.onShutdown { withTimeout(timeout) { job.cancelAndJoin() } } - job.start() - job.join() - unregister() + process.use { env -> + env.runScope(context) { + val job = + launch(start = CoroutineStart.LAZY) { + try { + block() + env.exit(0) + } catch (_: SuspendAppShutdown) {} catch (e: Throwable) { + uncaught(e) + env.exit(-1) + } + } + val unregister = + env.onShutdown { + withTimeout(timeout) { + job.cancel(SuspendAppShutdown) + job.join() + } + } + job.start() + job.join() + unregister() + } } + +/** Marker type so track shutdown signal */ +private object SuspendAppShutdown : CancellationException("SuspendApp shutting down.") diff --git a/src/commonMain/kotlin/arrow/continuations/unsafe/unsafe.kt b/src/commonMain/kotlin/arrow/continuations/unsafe/unsafe.kt deleted file mode 100644 index 6f6d2d8..0000000 --- a/src/commonMain/kotlin/arrow/continuations/unsafe/unsafe.kt +++ /dev/null @@ -1,13 +0,0 @@ -package arrow.continuations.unsafe - -import kotlin.coroutines.CoroutineContext -import kotlinx.coroutines.CoroutineScope - -expect object Unsafe { - fun onShutdown(block: suspend () -> Unit): () -> Unit - - fun runCoroutineScope( - context: CoroutineContext, - block: suspend CoroutineScope.() -> Unit, - ) -} diff --git a/src/jsMain/kotlin/arrow/continuations/unsafe/unsafe.kt b/src/jsMain/kotlin/arrow/continuations/Enviroment.js.kt similarity index 61% rename from src/jsMain/kotlin/arrow/continuations/unsafe/unsafe.kt rename to src/jsMain/kotlin/arrow/continuations/Enviroment.js.kt index 9e4ad8b..8d972ca 100644 --- a/src/jsMain/kotlin/arrow/continuations/unsafe/unsafe.kt +++ b/src/jsMain/kotlin/arrow/continuations/Enviroment.js.kt @@ -1,4 +1,4 @@ -package arrow.continuations.unsafe +package arrow.continuations import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext @@ -16,32 +16,30 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.promise -actual object Unsafe { - @Suppress("UNUSED_PARAMETER") - private fun exitProcess(i: Int) { - runCatching { js("process.exit(i)") } +actual fun process(): Process = JsProcess + +object JsProcess : Process { + override fun onShutdown(block: suspend () -> Unit): suspend () -> Unit { + onSigTerm { code -> exitAfter(128 + code) { block() } } + onSigInt { code -> exitAfter(128 + code) { block() } } + return { /* Nothing to unregister */} } - actual fun onShutdown(block: suspend () -> Unit): () -> Unit { - suspend fun run(code: Int): Result = - runCatching { - block() - exitProcess(code) - } - .onFailure { - it.printStackTrace() - exitProcess(-1) - } + override fun onSigTerm(block: suspend (code: Int) -> Unit) = onSignal("SIGTERM") { block(15) } + + override fun onSigInt(block: suspend (code: Int) -> Unit) = onSignal("SIGINT") { block(2) } - onSignal("SIGTERM") { run(143) } - onSignal("SIGINT") { run(130) } - return {} + @OptIn(DelicateCoroutinesApi::class) + @Suppress("UNUSED_PARAMETER") + private fun onSignal(signal: String, block: suspend () -> Unit) { + @Suppress("UNUSED_VARIABLE") + val provide: () -> Promise = { GlobalScope.promise { block() } } + js("process.on(signal, function() {\n" + " provide()\n" + "});") } - actual fun runCoroutineScope( - context: CoroutineContext, - block: suspend CoroutineScope.() -> Unit, - ) { + private val jobs: MutableList = mutableListOf() + + override fun runScope(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit) { val innerJob = Job() val innerScope = CoroutineScope(innerJob) suspend { @@ -62,12 +60,21 @@ actual object Unsafe { .startCoroutine(Continuation(EmptyCoroutineContext) {}) } - @OptIn(DelicateCoroutinesApi::class) - @Suppress("UNUSED_PARAMETER") - private fun onSignal(signal: String, block: suspend () -> Unit) { - @Suppress("UNUSED_VARIABLE") - val provide: () -> Promise = { GlobalScope.promise { block() } } + override fun exit(code: Int) { + runCatching { js("process.exit(code)") } + } - js("process.on(signal, function() {\n" + " provide()\n" + "});") + override fun close() { + suspend { jobs.forEach { it.cancelAndJoin() } } + .startCoroutine(Continuation(EmptyCoroutineContext) {}) } } + +private inline fun Process.exitAfter(code: Int, block: () -> Unit): Unit = + try { + block() + exit(code) + } catch (e: Throwable) { + e.printStackTrace() + exit(-1) + } diff --git a/src/jvmMain/kotlin/arrow/continuations/Enviroment.jvm.kt b/src/jvmMain/kotlin/arrow/continuations/Enviroment.jvm.kt new file mode 100644 index 0000000..e04300a --- /dev/null +++ b/src/jvmMain/kotlin/arrow/continuations/Enviroment.jvm.kt @@ -0,0 +1,63 @@ +package arrow.continuations + +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.coroutines.CoroutineContext +import kotlin.system.exitProcess +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.runBlocking +import sun.misc.Signal +import sun.misc.SignalHandler + +actual fun process(): Process = JvmProcess + +object JvmProcess : Process { + override fun onShutdown(block: suspend () -> Unit): suspend () -> Unit { + val isShutdown = AtomicBoolean(false) + fun shutdown() { + if (!isShutdown.getAndSet(true)) + runBlocking { + // We don't call exit from ShutdownHook on JVM + try { + block() + } catch (e: Throwable) { + e.printStackTrace() + } + } + } + + val hook = Thread(::shutdown, "Arrow-kt SuspendApp JVM ShutdownHook") + Runtime.getRuntime().addShutdownHook(hook) + return { + if (!isShutdown.get()) { + Runtime.getRuntime().removeShutdownHook(hook) + } + } + } + + override fun onSigTerm(block: suspend (code: Int) -> Unit) = + addSignalHandler("SIGTERM") { block(15) } + + override fun onSigInt(block: suspend (code: Int) -> Unit) = + addSignalHandler("SIGINT") { block(2) } + + private fun addSignalHandler(signal: String, action: suspend () -> Unit): Unit = + try { + var handle: SignalHandler? = null + handle = + Signal.handle(Signal(signal)) { prev -> + runBlocking { action() } + if (handle != SignalHandler.SIG_DFL && handle != SignalHandler.SIG_IGN) { + handle?.handle(prev) + } + } + } catch (e: Throwable) { + /* Ignore */ + } + + override fun runScope(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit) = + runBlocking(context, block) + + override fun exit(code: Int): Nothing = exitProcess(code) + + override fun close(): Unit = Unit +} diff --git a/src/jvmMain/kotlin/arrow/continuations/unsafe/unsafe.kt b/src/jvmMain/kotlin/arrow/continuations/unsafe/unsafe.kt deleted file mode 100644 index 30134ba..0000000 --- a/src/jvmMain/kotlin/arrow/continuations/unsafe/unsafe.kt +++ /dev/null @@ -1,38 +0,0 @@ -package arrow.continuations.unsafe - -import java.util.concurrent.atomic.AtomicBoolean -import kotlin.coroutines.CoroutineContext -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.runBlocking - -actual object Unsafe { - actual fun onShutdown(block: suspend () -> Unit): () -> Unit { - val isShutdown = AtomicBoolean(false) - // ON JS and Native we return 143 and 130 on SIGTERM and SIGINT - val hook = - Thread( - { - runBlocking { - isShutdown.set(true) - runCatching { block() } - .onFailure { - // Change to default error handler lambda - it.printStackTrace() - } - } - }, - "Shutdown hook" - ) - Runtime.getRuntime().addShutdownHook(hook) - return { - if (!isShutdown.get()) { - Runtime.getRuntime().removeShutdownHook(hook) - } - } - } - - actual fun runCoroutineScope( - context: CoroutineContext, - block: suspend CoroutineScope.() -> Unit, - ): Unit = runBlocking(context, block) -} diff --git a/src/nativeMain/kotlin/arrow/continuations/Enviroment.native.kt b/src/nativeMain/kotlin/arrow/continuations/Enviroment.native.kt new file mode 100644 index 0000000..439083f --- /dev/null +++ b/src/nativeMain/kotlin/arrow/continuations/Enviroment.native.kt @@ -0,0 +1,109 @@ +package arrow.continuations + +import kotlin.coroutines.CoroutineContext +import kotlin.experimental.ExperimentalNativeApi +import kotlin.system.exitProcess +import kotlinx.cinterop.CFunction +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.staticCFunction +import kotlinx.coroutines.* +import platform.posix.SIGINT +import platform.posix.SIGTERM +import platform.posix.signal + +public actual fun process(): Process = NativeProcess() + +public val SIGINFO: Int = 29 + +@OptIn(ExperimentalNativeApi::class) +public val SIGUSR1: Int? = + when (Platform.osFamily) { + OsFamily.LINUX -> 10 + OsFamily.MACOSX -> 30 + else -> null + } + +@OptIn(ExperimentalForeignApi::class, ExperimentalStdlibApi::class) +private class NativeProcess : Process, AutoCloseable { + private val job = SupervisorJob() + private val scope = CoroutineScope(SIGNAL_DISPATCHER + job) + + override fun onShutdown(block: suspend () -> Unit): suspend () -> Unit { + onSigTerm { exitAfter(it + 128) { block() } } + onSigInt { exitAfter(it + 128) { block() } } + return { /* Nothing to unregister */} + } + + override fun exit(code: Int): Nothing = exitProcess(code) + + override fun onSigTerm(block: suspend (code: Int) -> Unit) = + onSignal(SIGTERM, SIGTERM_HANDLER, TERMINATED, block) + + override fun onSigInt(block: suspend (code: Int) -> Unit) = + onSignal(SIGINT, SIGINT_HANDLER, INTERRUPTED, block) + + private fun onSignal( + code: Int, + handler: CPointer Unit>>, + signal: CompletableDeferred, + block: suspend (code: Int) -> Unit, + ) { + scope.launch { + val signalCode = signal.await() + block(signalCode) + } + signal(code, handler) + } + + @OptIn(ExperimentalNativeApi::class, ExperimentalStdlibApi::class) + override fun close(): Unit = runBlocking { + // TODO join all jobs, and re-throw all exceptions ?? + // All jobs should've finished when the Enviroment is closed. + assert(job.children.none()) { "Job should not have any children anymore." } + listOf(kotlin.runCatching { job.cancelAndJoin() }, runCatching { SIGNAL_DISPATCHER.close() }) + .getOrThrow() + } + + override fun runScope(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit) = + runBlocking(context, block) +} + +private fun List>.getOrThrow() = + fold(null) { acc: Throwable?, result -> + val other = result.exceptionOrNull() + other?.let { acc?.apply { addSuppressed(other) } ?: other } ?: acc + } + +private val TERMINATED: CompletableDeferred = CompletableDeferred() + +private val INTERRUPTED: CompletableDeferred = CompletableDeferred() + +@OptIn(ExperimentalForeignApi::class) +private val SIGTERM_HANDLER = + staticCFunction { code -> + TERMINATED.complete(code) + runBlocking(SIGNAL_DISPATCHER) { BACKPRESSURE.await() } + } + +@OptIn(ExperimentalForeignApi::class) +private val SIGINT_HANDLER = + staticCFunction { code -> + INTERRUPTED.complete(code) + runBlocking(SIGNAL_DISPATCHER) { BACKPRESSURE.await() } + } + +private val BACKPRESSURE: CompletableDeferred = CompletableDeferred() + +@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) +private val SIGNAL_DISPATCHER: CloseableCoroutineDispatcher = + newSingleThreadContext("arrow-kt-suspendapp-signal-dispatcher") + +private inline fun Process.exitAfter(code: Int, block: () -> Unit): Unit = + try { + block() + exit(code) + } catch (e: Throwable) { + e.printStackTrace() + exit(-1) + } diff --git a/src/nativeMain/kotlin/arrow/continuations/unsafe/unsafe.kt b/src/nativeMain/kotlin/arrow/continuations/unsafe/unsafe.kt deleted file mode 100644 index a22e549..0000000 --- a/src/nativeMain/kotlin/arrow/continuations/unsafe/unsafe.kt +++ /dev/null @@ -1,48 +0,0 @@ -package arrow.continuations.unsafe - -import kotlin.coroutines.CoroutineContext -import kotlin.system.exitProcess -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.staticCFunction -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import platform.posix.SIGINT -import platform.posix.SIGTERM -import platform.posix.signal - -@SharedImmutable private val SIGNAL: CompletableDeferred = CompletableDeferred() - -@SharedImmutable private val BACKPRESSURE: CompletableDeferred = CompletableDeferred() - -@SharedImmutable -@OptIn(ExperimentalForeignApi::class) -private val SignalHandler = - staticCFunction { code -> - SIGNAL.complete(code) - val finalCode: Int = runBlocking { BACKPRESSURE.await() } - exitProcess(finalCode) - } - -actual object Unsafe { - @OptIn(DelicateCoroutinesApi::class, ExperimentalForeignApi::class) - actual fun onShutdown(block: suspend () -> Unit): () -> Unit { - GlobalScope.launch { - val code: Int = SIGNAL.await() - val res: Result = runCatching { block() } - BACKPRESSURE.complete(res.fold({ code }, { -1 })) - } - - signal(SIGTERM, SignalHandler) - signal(SIGINT, SignalHandler) - return {} - } - - actual fun runCoroutineScope( - context: CoroutineContext, - block: suspend CoroutineScope.() -> Unit, - ): Unit = runBlocking(context, block) -} diff --git a/suspendapp-ktor/api/suspendapp-ktor.api b/suspendapp-ktor/api/suspendapp-ktor.api index e319e02..f0ecc0d 100644 --- a/suspendapp-ktor/api/suspendapp-ktor.api +++ b/suspendapp-ktor/api/suspendapp-ktor.api @@ -1,6 +1,6 @@ public final class arrow/continuations/ktor/KtorServerKt { - public static final fun server-NrqJHPo (Larrow/fx/coroutines/continuations/ResourceScope;Lio/ktor/server/engine/ApplicationEngineFactory;Lio/ktor/server/engine/ApplicationEngineEnvironment;JJJLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun server-NrqJHPo$default (Larrow/fx/coroutines/continuations/ResourceScope;Lio/ktor/server/engine/ApplicationEngineFactory;Lio/ktor/server/engine/ApplicationEngineEnvironment;JJJLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun server--IzA1Es (Larrow/fx/coroutines/continuations/ResourceScope;Lio/ktor/server/engine/ApplicationEngineFactory;Lio/ktor/server/engine/ApplicationEngineEnvironment;Lkotlin/jvm/functions/Function1;JJJLkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun server--IzA1Es$default (Larrow/fx/coroutines/continuations/ResourceScope;Lio/ktor/server/engine/ApplicationEngineFactory;Lio/ktor/server/engine/ApplicationEngineEnvironment;Lkotlin/jvm/functions/Function1;JJJLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun server-T7_icE8 (Larrow/fx/coroutines/continuations/ResourceScope;Lio/ktor/server/engine/ApplicationEngineFactory;ILjava/lang/String;Ljava/util/List;Lkotlin/jvm/functions/Function1;JJJLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun server-T7_icE8$default (Larrow/fx/coroutines/continuations/ResourceScope;Lio/ktor/server/engine/ApplicationEngineFactory;ILjava/lang/String;Ljava/util/List;Lkotlin/jvm/functions/Function1;JJJLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } diff --git a/suspendapp-ktor/build.gradle.kts b/suspendapp-ktor/build.gradle.kts index 7a5a10a..5bd8cf1 100644 --- a/suspendapp-ktor/build.gradle.kts +++ b/suspendapp-ktor/build.gradle.kts @@ -1,7 +1,5 @@ -@Suppress("DSL_SCOPE_VIOLATION") plugins { id(libs.plugins.kotlin.multiplatform.get().pluginId) - alias(libs.plugins.arrowGradleConfig.formatter) alias(libs.plugins.arrowGradleConfig.publish) alias(libs.plugins.arrowGradleConfig.versioning) @@ -35,7 +33,7 @@ kotlin { val linuxX64Main by getting val macosArm64Main by getting val macosX64Main by getting - + create("nativeMain") { dependsOn(commonMain) linuxX64Main.dependsOn(this) diff --git a/suspendapp-ktor/gradle.properties b/suspendapp-ktor/gradle.properties index 8dc6bc8..09a9fa3 100644 --- a/suspendapp-ktor/gradle.properties +++ b/suspendapp-ktor/gradle.properties @@ -1,9 +1,3 @@ -# Package definitions projects.group=io.arrow-kt - -# Pomfile definitions pom.name=suspendapp-ktor pom.description=Ktor engine creation as a Resource, which will gracefully shut down when it's finished. - -# Kotlin configuration -kotlin.mpp.enableCompatibilityMetadataVariant=false diff --git a/suspendapp-ktor/src/commonMain/kotlin/arrow/continuations/ktor/KtorServer.kt b/suspendapp-ktor/src/commonMain/kotlin/arrow/continuations/ktor/KtorServer.kt index e856d3b..bf3ab95 100644 --- a/suspendapp-ktor/src/commonMain/kotlin/arrow/continuations/ktor/KtorServer.kt +++ b/suspendapp-ktor/src/commonMain/kotlin/arrow/continuations/ktor/KtorServer.kt @@ -52,15 +52,7 @@ suspend fun < ) .apply(ApplicationEngine::start) }) { engine, _ -> - if (!engine.environment.developmentMode) { - engine.environment.log.info( - "prewait delay of ${preWait.inWholeMilliseconds}ms, turn it off using io.ktor.development=true" - ) - delay(preWait.inWholeMilliseconds) - } - engine.environment.log.info("Shutting down HTTP server...") - engine.stop(grace.inWholeMilliseconds, timeout.inWholeMicroseconds) - engine.environment.log.info("HTTP server shutdown!") + engine.release(preWait, grace, timeout) } /** @@ -82,21 +74,32 @@ suspend fun < .server( factory: ApplicationEngineFactory, environment: ApplicationEngineEnvironment, + configure: TConfiguration.() -> Unit = {}, preWait: Duration = 30.seconds, grace: Duration = 500.milliseconds, timeout: Duration = 500.milliseconds ): ApplicationEngine = - install({ embeddedServer(factory, environment).apply(ApplicationEngine::start) }) { engine, _ -> - if (!engine.environment.developmentMode) { - engine.environment.log.info( - "prewait delay of ${preWait.inWholeMilliseconds}ms, turn it off using io.ktor.development=true" - ) - delay(preWait.inWholeMilliseconds) - } - engine.environment.log.info("Shutting down HTTP server...") - engine.stop(grace.inWholeMilliseconds, timeout.inWholeMicroseconds) - engine.environment.log.info("HTTP server shutdown!") + install({ embeddedServer(factory, environment, configure).apply(ApplicationEngine::start) }) { + engine, + _ -> + engine.release(preWait, grace, timeout) + } + +private suspend fun ApplicationEngine.release( + preWait: Duration, + grace: Duration, + timeout: Duration +) { + if (!environment.developmentMode) { + environment.log.info( + "prewait delay of ${preWait.inWholeMilliseconds}ms, turn it off using io.ktor.development=true" + ) + delay(preWait.inWholeMilliseconds) } + environment.log.info("Shutting down HTTP server...") + stop(grace.inWholeMilliseconds, timeout.inWholeMicroseconds) + environment.log.info("HTTP server shutdown!") +} // Ported from Ktor: // https://github.com/ktorio/ktor/blob/0de7948fbe3f78673f4f90de9c5ea5986691819a/ktor-server/ktor-server-host-common/jvmAndNix/src/io/ktor/server/engine/ServerEngineUtils.kt