Skip to content

Commit

Permalink
Merge pull request #3347 from BalmungSan/fix-take-flow
Browse files Browse the repository at this point in the history
Fix subte bug in the interop.flow/StreamSubscription
  • Loading branch information
armanbilge authored Nov 25, 2023
2 parents 00fb259 + 06baa72 commit 53f458d
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ private[flow] final class StreamSubscription[F[_], A] private (
def run: F[Unit] = {
val subscriptionPipe: Pipe[F, A, A] = in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Pull.eval(F.delay(requests.get())).flatMap { n =>
Pull.eval(F.delay(requests.getAndSet(0))).flatMap { n =>
if (n == Long.MaxValue)
// See: https://github.com/reactive-streams/reactive-streams-jvm#3.17
s.pull.echo
else if (n == 0)
Pull.eval(F.asyncCheckAttempt[Unit] { cb =>
Expand All @@ -85,11 +86,11 @@ private[flow] final class StreamSubscription[F[_], A] private (
}
}) >> go(s)
else
Pull.eval(F.delay(requests.getAndAdd(-n))) >>
s.pull.take(n).flatMap {
case None => Pull.done
case Some(rem) => go(rem)
}
// We take the requested elements from the stream until we exhaust it.
s.pull.take(n).flatMap {
case None => Pull.done
case Some(rem) => go(rem)
}
}

go(in).stream
Expand Down

0 comments on commit 53f458d

Please sign in to comment.