Skip to content

Commit

Permalink
Flow <-> Reactive Streams Publisher integration (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Nov 22, 2024
1 parent a2dc628 commit 95f83a7
Show file tree
Hide file tree
Showing 12 changed files with 470 additions and 13 deletions.
20 changes: 17 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ compileDocumentation := {
lazy val rootProject = (project in file("."))
.settings(commonSettings)
.settings(publishArtifact := false, name := "ox")
.aggregate(core, examples, kafka, mdcLogback)
.aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams)

lazy val core: Project = (project in file("core"))
.settings(commonSettings)
.settings(
name := "core",
libraryDependencies ++= Seq(
"com.softwaremill.jox" % "channels" % "0.3.1",
scalaTest
scalaTest,
"org.apache.pekko" %% "pekko-stream" % "1.1.2" % Test,
"org.reactivestreams" % "reactive-streams-tck-flow" % "1.0.4" % Test
),
Test / fork := true
)
Expand Down Expand Up @@ -81,6 +83,17 @@ lazy val mdcLogback: Project = (project in file("mdc-logback"))
)
.dependsOn(core)

lazy val flowReactiveStreams: Project = (project in file("flow-reactive-streams"))
.settings(commonSettings)
.settings(
name := "flow-reactive-streams",
libraryDependencies ++= Seq(
"org.reactivestreams" % "reactive-streams" % "1.0.4",
scalaTest
)
)
.dependsOn(core)

lazy val documentation: Project = (project in file("generated-doc")) // important: it must not be doc/
.enablePlugins(MdocPlugin)
.settings(commonSettings)
Expand All @@ -99,5 +112,6 @@ lazy val documentation: Project = (project in file("generated-doc")) // importan
.dependsOn(
core,
kafka,
mdcLogback
mdcLogback,
flowReactiveStreams
)
12 changes: 12 additions & 0 deletions core/src/main/scala/ox/Ox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ox
import java.util.concurrent.StructuredTaskScope
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.implicitNotFound
import ox.channels.Actor
import ox.channels.ActorRef

/** Capability granted by an [[unsupervised]] concurrency scope (as well as, via subtyping, by [[supervised]] and [[supervisedError]]).
*
Expand Down Expand Up @@ -40,6 +42,16 @@ end OxUnsupervised
trait Ox extends OxUnsupervised:
private[ox] def asNoErrorMode: OxError[Nothing, [T] =>> T]

// see externalRunner
private[ox] lazy val externalSchedulerActor: ActorRef[ExternalScheduler] = Actor.create(
new ExternalScheduler:
def run(f: Ox ?=> Unit): Unit = f(using Ox.this)
)(using this)
end Ox

private[ox] trait ExternalScheduler:
def run(f: Ox ?=> Unit): Unit

/** Capability granted by a [[supervisedError]] concurrency scope.
*
* Represents a capability to:
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/ox/channels/BufferCapacity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package ox.channels
*/
opaque type BufferCapacity = Int

extension (c: BufferCapacity) def toInt: Int = c

object BufferCapacity:
def apply(c: Int): BufferCapacity = c
def newChannel[T](using BufferCapacity): Channel[T] = Channel.withCapacity[T](summon[BufferCapacity])
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/scala/ox/externalRunner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ox

import ox.channels.ActorRef

/** Returns a concurrency-scope-specific runner, which allows scheduling of functions to be run within the current concurrency scope, from
* the context of arbitrary threads (not necessarily threads that are part of the current concurrency scope).
*
* Usage: obtain a runner from within a concurrency scope, while on a fork/thread that is managed by the concurrency scope. Then, pass that
* runner to the external library. It can then schedule functions (e.g. create forks) to be run within the concurrency scope from arbitary
* threads, as long as the concurrency scope isn't complete.
*
* Execution is scheduled through an [[Actor]], which is lazily created, and bound to an [[Ox]] instances.
*
* This method should **only** be used when integrating Ox with libraries that manage concurrency on their own, and which run callbacks on
* a managed thread pool. The logic executed by the third-party library should be entirely contained within the lifetime of this
* concurrency scope. The sole purpose of this method is to enable running scope-aware logic from threads **other** than Ox-managed.
*
* Use with care!
*
* @see
* [[ExternalRunner.runAsync]]
*/
def externalRunner()(using Ox): ExternalRunner = ExternalRunner(summon[Ox].externalSchedulerActor)

class ExternalRunner(scheduler: ActorRef[ExternalScheduler]):
/** Runs the given function asynchronously, in the scope of the [[Ox]] concurrency scope in which this runner was created.
*
* `f` should return promptly, not to obstruct execution of other scheduled functions. Typically, it should start a background fork.
*/
def runAsync(f: Ox ?=> Unit): Unit = scheduler.ask(_.run(f))
end ExternalRunner
9 changes: 7 additions & 2 deletions core/src/main/scala/ox/flow/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ import scala.annotation.nowarn
*
* Running a flow is possible using one of the `run*` methods, such as [[Flow.runToList]], [[Flow.runToChannel]] or [[Flow.runFold]].
*/
class Flow[+T](protected val last: FlowStage[T]) extends FlowOps[T] with FlowRunOps[T] with FlowIOOps[T] with FlowTextOps[T]
class Flow[+T](protected val last: FlowStage[T])
extends FlowOps[T]
with FlowRunOps[T]
with FlowIOOps[T]
with FlowTextOps[T]
with FlowReactiveOps[T]

object Flow extends FlowCompanionOps with FlowCompanionIOOps
object Flow extends FlowCompanionOps with FlowCompanionIOOps with FlowCompanionReactiveOps

//

Expand Down
80 changes: 80 additions & 0 deletions core/src/main/scala/ox/flow/FlowCompanionReactiveOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ox.flow

import ox.channels.BufferCapacity
import ox.channels.ChannelClosed
import ox.channels.toInt
import ox.discard
import ox.forkUnsupervised
import ox.pipe
import ox.repeatWhile
import ox.tapException
import ox.unsupervised

import java.util.concurrent.Flow.Publisher
import java.util.concurrent.Flow.Subscriber
import java.util.concurrent.Flow.Subscription
import java.util.concurrent.atomic.AtomicReference

trait FlowCompanionReactiveOps:
this: Flow.type =>

/** Creates a [[Flow]] from a [[Publisher]], that is, which emits the elements received by subscribing to the publisher. A new
* subscription is created every time this flow is run.
*
* The data is passed from a subscription to the flow using a [[ox.channel.Channel]], with a capacity given by the [[BufferCapacity]] in
* scope. That's also how many elements will be at most requested from the publisher at a time.
*
* The publisher parameter should implement the JDK 9+ `Flow.Publisher` API. To create a flow from a publisher implementing
* `com.reactivestreams.Publisher`, use the `flow-reactive-streams` module.
*/
def fromPublisher[T](p: Publisher[T])(using BufferCapacity): Flow[T] = usingEmitInline: emit =>
// using an unsafe scope for efficiency
unsupervised {
val channel = BufferCapacity.newChannel[T]
val capacity = summon[BufferCapacity].toInt
val demandThreshold = math.ceil(capacity / 2.0).toInt

// used to "extract" the subscription that is set in the subscription running in a fork
val subscriptionRef = new AtomicReference[Subscription]()
var subscription: Subscription = null

var toDemand = 0

{
// unsafe, but we are sure that this won't throw any exceptions (unless there's a bug in the publisher)
forkUnsupervised {
p.subscribe(new Subscriber[T]:
def onSubscribe(s: Subscription): Unit =
subscriptionRef.set(s)
s.request(capacity)

def onNext(t: T): Unit = channel.send(t)
def onError(t: Throwable): Unit = channel.error(t)
def onComplete(): Unit = channel.done()
)
}.discard

repeatWhile:
val t = channel.receiveOrClosed()
t match
case ChannelClosed.Done => false
case e: ChannelClosed.Error => throw e.toThrowable
case t: T @unchecked =>
emit(t)

// if we have an element, onSubscribe must have already happened; we can read the subscription and cache it for later
if subscription == null then subscription = subscriptionRef.get()

// now that we'ver received an element from the channel, we can request more
toDemand += 1
// we request in batches, to avoid too many requests
if toDemand >= demandThreshold then
subscription.request(toDemand)
toDemand = 0

true
end match
// exceptions might be propagated from the channel, but they might also originate from an interruption
}.tapException(_ => subscriptionRef.get().pipe(s => if s != null then s.cancel()))
}
end FlowCompanionReactiveOps
149 changes: 149 additions & 0 deletions core/src/main/scala/ox/flow/FlowReactiveOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package ox.flow

import ox.Ox
import ox.channels.BufferCapacity
import ox.channels.Channel
import ox.channels.ChannelClosed
import ox.channels.Sink
import ox.channels.forkPropagate
import ox.channels.selectOrClosed
import ox.discard
import ox.fork
import ox.forkDiscard

import java.util.concurrent.Flow.Publisher
import java.util.concurrent.Flow.Subscriber
import java.util.concurrent.Flow.Subscription
import ox.unsupervised
import ox.externalRunner
import ox.tapException

trait FlowReactiveOps[+T]:
outer: Flow[T] =>

/** Converts this [[Flow]] into a [[Publisher]]. The flow is run every time the publisher is subscribed to.
*
* Must be run within a concurrency scope, as upon subscribing, a fork is created to run the publishing process. Hence, the scope should
* remain active as long as the publisher is used.
*
* Elements emitted by the flow are buffered, using a buffer of capacity given by the [[BufferCapacity]] in scope.
*
* The returned publisher implements the JDK 9+ `Flow.Publisher` API. To obtain a publisher implementing `com.reactivestreams.Publisher`,
* use the `flow-reactive-streams` module.
*/
def toPublisher[U >: T](using Ox, BufferCapacity): Publisher[U] =
// we need to obtain the external runner while on a fork managed by Ox
val external = externalRunner()

new Publisher[U]:
// 1.10: subscribe can be called multiple times; each time, the flow is started from scratch
// 1.11: subscriptions are unicast
def subscribe(subscriber: Subscriber[? >: U]): Unit =
if subscriber == null then throw new NullPointerException("1.9: subscriber is null")
// 3.13: the reference to the subscriber is held only as long as the main loop below runs
// 3.14: not in this implementation

// `runToSubscriber` blocks as long as data is produced by the flow or until the subscription is cancelled
// we cannot block `subscribe` (see https://github.com/reactive-streams/reactive-streams-jvm/issues/393),
// hence running in a fork; however, the reactive library might run .subscribe on a different thread, that's
// why we need to use the external runner functionality
external.runAsync(forkDiscard(runToSubscriber(subscriber)).discard)
end subscribe
end new
end toPublisher

private def runToSubscriber[U >: T](subscriber: Subscriber[U])(using BufferCapacity): Unit =
// starting a new scope so that cancelling (== completing the main body) cleans up (interrupts) any background forks
// using an unsafe scope for efficiency, we only ever start a single fork where all errors are propagated
unsupervised:
// processing state: cancelled flag, error sent flag, demand
var cancelled = false
var errorSent = false
var demand = 0L

{
val signals = Channel.unlimited[Signal]
// 1.9: onSubscribe must be called first
subscriber.onSubscribe(new FlowSubscription(signals))

// we need separate error & data channels so that we can select from error & signals only, without receiving data
// 1.4 any errors from running the flow end up here
val errors = Channel.unlimited[Nothing]
val data = BufferCapacity.newChannel[T]

// running the flow in the background; all errors end up as an error of the `errors` channel
forkPropagate(errors) {
last.run(FlowEmit.fromInline(t => data.send(t)))
data.done()
}.discard

def cancel() = cancelled = true
def signalErrorAndCancel(e: Throwable): Unit =
if !cancelled then
cancel()
errorSent = true
subscriber.onError(e)

def increaseDemand(d: Long): Unit =
if d <= 0 then signalErrorAndCancel(new IllegalArgumentException("3.9: demand must be positive"))
else
demand += d
// 3.17: when demand overflows `Long.MaxValue`, this is treated as the signalled demand to be "effectively unbounded"
if demand < 0 then demand = Long.MaxValue

// main processing loop: running as long as
while !cancelled do // 1.7, 3.12 - ending the main loop after onComplete/onError
if demand == 0 then
selectOrClosed(errors.receiveClause, signals.receiveClause) match
case signals.Received(Signal.Request(n)) => increaseDemand(n)
case signals.Received(Signal.Cancel) => cancel()
case errors.Received(_) => // impossible
case ChannelClosed.Done => // impossible
case ChannelClosed.Error(e) => // only `errors` can be closed due to an error
cancel()
errorSent = true
subscriber.onError(e)
else
selectOrClosed(errors.receiveClause, signals.receiveClause, data.receiveClause) match
case signals.Received(Signal.Request(n)) => increaseDemand(n)
case signals.Received(Signal.Cancel) => cancel()
case errors.Received(_) => // impossible
case data.Received(t: T) =>
subscriber.onNext(t)
demand -= 1
case ChannelClosed.Done => // only `data` can be done
cancel() // 1.6: when signalling onComplete/onError, the subscription is considered cancelled
subscriber.onComplete() // 1.5
case ChannelClosed.Error(e) => // only `errors` can be closed due to an error
cancel()
errorSent = true
subscriber.onError(e)
end while
}.tapException: e =>
// e might be an interrupted exception (the scope ends), or a bug; either way, letting downstream know
if !errorSent then subscriber.onError(e)
end runToSubscriber

end FlowReactiveOps

/** Signals sent from a [[Subscription]] to a running [[Publisher]]. */
private enum Signal:
case Request(n: Long)
case Cancel

private class FlowSubscription(signals: Sink[Signal]) extends Subscription:
// 3.2, 3.4: request/cancel can be called anytime, in a thread-safe way
// 3.3: there's no recursion between request & onNext
// 3.6: after a cancel, more requests can be sent to the channel, but they won't be processed (the cancel will be processed first)
// 3.15: the signals channel is never closed
def request(n: Long): Unit = signals.send(Signal.Request(n))
// 3.5: as above for 3.2
// 3.7: as above for 3.6
// 3.16: as above for 3.15
def cancel(): Unit = signals.send(Signal.Cancel)

// 3.10, 3.11: no synchronous calls in this implementation
end FlowSubscription

private trait ExternalScheduler:
def run(f: () => Unit): Unit
16 changes: 9 additions & 7 deletions core/src/main/scala/ox/unsupervised.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ private[ox] def scopedWithCapability[T](capability: Ox)(f: Ox ?=> T): T =
end if
end runFinalizers

// if this is inlined manually (in the program's text), it gets incorrectly indented
inline def runFAndJoinScope =
try f(using capability)
finally
scope.shutdown()
scope.join().discard

try
val t =
try
try f(using capability)
finally
scope.shutdown()
scope.join().discard
// join might have been interrupted
finally scope.close()
try runFAndJoinScope
finally scope.close() // join might have been interrupted, hence the finally

// running the finalizers only once we are sure that all child threads have been terminated, so that no new
// finalizers are added, and none are lost
Expand Down
Loading

0 comments on commit 95f83a7

Please sign in to comment.