Skip to content

Commit

Permalink
Merge pull request #3348 from armanbilge/fix/3338
Browse files Browse the repository at this point in the history
Don't use `Dispatcher` for `suspendReadableAndRead` and TLS initialization on Node.js
  • Loading branch information
mpilquist authored Dec 23, 2023
2 parents 97b9d23 + 271f86d commit cc913dd
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 65 deletions.
2 changes: 2 additions & 0 deletions io/js/src/main/scala/fs2/io/internal/facade/events.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import scala.scalajs.js
@nowarn212("cat=unused")
private[io] trait EventEmitter extends js.Object {

protected[io] def on(eventName: String, listener: js.Function0[Unit]): this.type = js.native

protected[io] def on[E](eventName: String, listener: js.Function1[E, Unit]): this.type = js.native

protected[io] def on[E, F](eventName: String, listener: js.Function2[E, F, Unit]): this.type =
Expand Down
141 changes: 97 additions & 44 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.concurrent.Channel
import fs2.io.internal.MicrotaskExecutor
import fs2.io.internal.facade

import java.nio.charset.Charset
Expand All @@ -58,53 +57,107 @@ private[fs2] trait ioplatform {
def suspendReadableAndRead[F[_], R <: Readable](
destroyIfNotEnded: Boolean = true,
destroyIfCanceled: Boolean = true
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] =
(for {
dispatcher <- Dispatcher.sequential[F]
channel <- Channel.unbounded[F, Unit].toResource
error <- F.deferred[Throwable].toResource
readableResource = for {
readable <- Resource.makeCase(F.delay(thunk)) {
case (readable, Resource.ExitCase.Succeeded) =>
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] = {

final class Listener {
private[this] var readable = false
private[this] var error: Either[Throwable, Boolean] = null
private[this] var ended = false
private[this] var callback: Either[Throwable, Boolean] => Unit = null

def handleReadable(): Unit =
if (callback eq null) {
readable = true
} else {
val cb = callback
callback = null
cb(Right(true))
}

def handleEnd(): Unit = {
ended = true
if (!readable && (callback ne null)) {
callback(Right(false))
}
}

def handleError(e: js.Error): Unit = {
error = Left(js.JavaScriptException(e))
if (callback ne null) {
callback(error)
}
}

private[this] def next: F[Boolean] = F.async { cb =>
F.delay {
if (error ne null) {
cb(error)
None
} else if (readable) {
readable = false
cb(Right(true))
None
} else if (ended) {
cb(Right(false))
None
} else {
callback = cb
Some(F.delay { callback = null })
}
}
}

def readableEvents: Stream[F, Unit] = {
def go: Pull[F, Unit, Unit] =
Pull.eval(next).flatMap { continue =>
if (continue)
Pull.outUnit >> go
else
Pull.done
}

go.streamNoScope
}

}

Resource
.eval(F.delay(new Listener))
.flatMap { listener =>
Resource
.makeCase {
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
val readable = thunk
readable.on("readable", () => listener.handleReadable())
readable.once("error", listener.handleError(_))
readable.once("end", () => listener.handleEnd())
readable
}
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
} {
case (readable, Resource.ExitCase.Succeeded) =>
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
else
F.unit
}
_ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void)
_ <- readable.registerListener[F, Any]("end", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, Any]("close", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, js.Error]("error", dispatcher) { e =>
error.complete(js.JavaScriptException(e)).void
}
} yield readable
// Implementation note: why run on the MicrotaskExecutor?
// In many cases creating a `Readable` starts async side-effects (e.g. negotiating TLS handshake or opening a file handle).
// Furthermore, these side-effects will invoke the listeners we register to the `Readable`.
// Therefore, it is critical that the listeners are registered to the `Readable` _before_ these async side-effects occur:
// in other words, before we next yield (cede) to the event loop. Because an arbitrary effect `F` (particularly `IO`) may cede at any time,
// our only recourse is to run the entire creation/listener registration process on the microtask executor.
readable <- readableResource.evalOn(MicrotaskExecutor)
stream =
(channel.stream
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
Stream
.evalUnChunk(
F.delay(
Option(readable.read())
.fold(Chunk.empty[Byte])(Chunk.uint8Array)
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
F.delay(readable.destroy())
else
F.unit
}
.fproduct { readable =>
listener.readableEvents.adaptError { case IOException(ex) => ex } >>
Stream.evalUnChunk(
F.delay(Option(readable.read()).fold(Chunk.empty[Byte])(Chunk.uint8Array(_)))
)
)).adaptError { case IOException(ex) => ex }
} yield (readable, stream)).adaptError { case IOException(ex) => ex }
}
}
.adaptError { case IOException(ex) => ex }
}

/** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`,
* that ends whenever the resulting stream terminates.
Expand Down
60 changes: 39 additions & 21 deletions io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,36 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
clientMode: Boolean,
params: TLSParameters,
logger: TLSLogger[F]
): Resource[F, TLSSocket[F]] = (Dispatcher.sequential[F], Dispatcher.parallel[F])
.flatMapN { (seqDispatcher, parDispatcher) =>
if (clientMode) {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake =>
): Resource[F, TLSSocket[F]] = {

final class Listener {
private[this] var value: Either[Throwable, Unit] = null
private[this] var callback: Either[Throwable, Unit] => Unit = null

def complete(value: Either[Throwable, Unit]): Unit =
if (callback ne null) {
callback(value)
callback = null
} else {
this.value = value
}

def get: F[Unit] = F.async { cb =>
F.delay {
if (value ne null) {
cb(value)
None
} else {
callback = cb
Some(F.delay { callback = null })
}
}
}
}

(Dispatcher.parallel[F], Resource.eval(F.delay(new Listener)))
.flatMapN { (parDispatcher, listener) =>
if (clientMode) {
TLSSocket
.forAsync(
socket,
Expand All @@ -79,22 +105,17 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
val tlsSock = facade.tls.connect(options)
tlsSock.once(
"secureConnect",
() => seqDispatcher.unsafeRunAndForget(handshake.complete(Either.unit))
() => listener.complete(Either.unit)
)
tlsSock.once[js.Error](
"error",
e =>
seqDispatcher.unsafeRunAndForget(
handshake.complete(Left(new js.JavaScriptException(e)))
)
e => listener.complete(Left(new js.JavaScriptException(e)))
)
tlsSock
}
)
.evalTap(_ => handshake.get.rethrow)
}
} else {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { verifyError =>
.evalTap(_ => listener.get)
} else {
TLSSocket
.forAsync(
socket,
Expand All @@ -117,24 +138,21 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
.map(e => new JavaScriptSSLException(js.JavaScriptException(e)))
.toLeft(())
else Either.unit
seqDispatcher.unsafeRunAndForget(verifyError.complete(result))
listener.complete(result)
}
)
tlsSock.once[js.Error](
"error",
e =>
seqDispatcher.unsafeRunAndForget(
verifyError.complete(Left(new js.JavaScriptException(e)))
)
e => listener.complete(Left(new js.JavaScriptException(e)))
)
tlsSock
}
)
.evalTap(_ => verifyError.get.rethrow)
.evalTap(_ => listener.get)
}
}
}
.adaptError { case IOException(ex) => ex }
.adaptError { case IOException(ex) => ex }
}
}

def fromSecureContext(context: SecureContext): TLSContext[F] =
Expand Down

0 comments on commit cc913dd

Please sign in to comment.