Skip to content

Commit

Permalink
Merge pull request #501 from Banno/fix-facepalm-master
Browse files Browse the repository at this point in the history
fix: make `RecordStream` construction `Resource`-safe (-> `master`)
  • Loading branch information
Kazark authored Sep 15, 2021
2 parents 28ccd6c + 9fa50b1 commit 0f24ffc
Showing 1 changed file with 45 additions and 37 deletions.
82 changes: 45 additions & 37 deletions core/src/main/scala/com/banno/kafka/RecordStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object HistoryAndStream {
history: Stream[F, A],
unbounded: Stream[F, A]
) extends HistoryAndUnbounded[F, Stream, A] {
override protected def unboundedStream: Stream[F, A] =unbounded
override protected def unboundedStream: Stream[F, A] = unbounded
}

def apply[F[_], A](
Expand Down Expand Up @@ -146,26 +146,38 @@ object RecordStream {
private sealed trait WhetherCommits[P[_[_], _]] {
def extrude[F[_], A](x: RecordStream[F, A]): P[F, A]

def chunk[F[_]: Applicative, A, B](
topical: Topical[A, B],
): P[F, IncomingRecords[A]] => P[F, A]
def chunk[F[_]: Applicative, A](
topical: Topical[A, ?],
stream: P[F, IncomingRecords[A]],
): P[F, A]

def historyAndUnbounded[F[_]: Async, A](
history: Stream[F, A],
unbounded: P[F, A]
): HistoryAndUnbounded[F, P, A]

final def chunkHistoryAndUnbounded[F[_]: Async, A](
topical: Topical[A, ?],
streams: HistoryAndUnbounded[F, P, IncomingRecords[A]]
): HistoryAndUnbounded[F, P, A] = {
historyAndUnbounded(
streams.history.flatMap(chunked),
chunk(topical, streams.unbounded)
)
}

def configs: List[(String, AnyRef)]
}

private object WhetherCommits {
object No extends WhetherCommits[Stream] {
override def extrude[F[_], A](x: RecordStream[F, A]): Stream[F, A] = x.records

override def chunk[F[_]: Applicative, A, B](
topical: Topical[A, B],
): Stream[F, IncomingRecords[A]] => Stream[F, A] =
_.flatMap(chunked)
override def chunk[F[_]: Applicative, A](
topical: Topical[A, ?],
stream: Stream[F, IncomingRecords[A]],
): Stream[F, A] =
stream.flatMap(chunked)

override def historyAndUnbounded[F[_]: Async, A](
history: Stream[F, A],
Expand All @@ -178,16 +190,16 @@ object RecordStream {
final case class May(groupId: GroupId) extends WhetherCommits[RecordStream] {
override def extrude[F[_], A](x: RecordStream[F, A]): RecordStream[F, A] = x

override def chunk[F[_]: Applicative, A, B](
topical: Topical[A, B],
): RecordStream[F, IncomingRecords[A]] => RecordStream[F, A] =
rs =>
new Impl[F, A](
rs match { case x: Impl[F, IncomingRecords[A]] => x.consumer }
) {
override protected def nextOffsets(x: A) = topical.nextOffset(x)
override def records: Stream[F, A] = rs.records.flatMap(chunked)
}
override def chunk[F[_]: Applicative, A](
topical: Topical[A, ?],
stream: RecordStream[F, IncomingRecords[A]],
): RecordStream[F, A] =
new Impl[F, A](
stream match { case x: Impl[F, IncomingRecords[A]] => x.consumer }
) {
override protected def nextOffsets(x: A) = topical.nextOffset(x)
override def records: Stream[F, A] = stream.records.flatMap(chunked)
}

override def historyAndUnbounded[F[_]: Async, A](
history: Stream[F, A],
Expand Down Expand Up @@ -262,34 +274,30 @@ object RecordStream {
sealed trait StreamSelector[F[_], G[_], P[_[_], _], A] {
private[RecordStream] def whetherCommits: WhetherCommits[P]
private[RecordStream] implicit val F: Async[F]
private[RecordStream] implicit val G: Apply[G]
// NOTE: To maintain linearity constraints, this cannot be anything more
// powerful than `Functor`.
private[RecordStream] implicit val G: Functor[G]

def unbounded: G[P[F, A]]
def historyAndUnbounded: G[HistoryAndUnbounded[F, P, A]]
def history: G[Stream[F, A]]

final def mapK[H[_]: Apply](f: G ~> H): StreamSelector[F, H, P, A] =
// NOTE: Linear use of `G` is important in case it is `Resource`.
final def mapK[H[_]: Functor](f: G ~> H): StreamSelector[F, H, P, A] =
StreamSelector.Impl(
f(history),
f(unbounded),
f(historyAndUnbounded),
whetherCommits,
)
}

private object StreamSelector {
final case class Impl[F[_], G[_], P[_[_], _], A](
history: G[Stream[F, A]],
unbounded: G[P[F, A]],
historyAndUnbounded: G[HistoryAndUnbounded[F, P, A]],
whetherCommits: WhetherCommits[P],
)(implicit val F: Async[F], val G: Apply[G])
)(implicit val F: Async[F], val G: Functor[G])
extends StreamSelector[F, G, P, A] {
override def historyAndUnbounded: G[HistoryAndUnbounded[F, P, A]] =
history.product(unbounded).map { pp =>
whetherCommits.historyAndUnbounded(
history = pp._1,
unbounded = pp._2,
)
}
override def history: G[Stream[F, A]] = historyAndUnbounded.map(_.history)
override def unbounded: G[P[F, A]] = historyAndUnbounded.map(_.unbounded)
}
}

Expand All @@ -300,10 +308,9 @@ object RecordStream {
batched: StreamSelector[F, G, P, IncomingRecords[A]],
topical: Topical[A, B],
): StreamSelector[F, G, P, A] = {
implicit val ap: Apply[G] = batched.G
implicit val ap: Functor[G] = batched.G
StreamSelector.Impl(
batched.history.map(_.flatMap(chunked)),
batched.unbounded.map(batched.whetherCommits.chunk(topical)),
batched.historyAndUnbounded.map(batched.whetherCommits.chunkHistoryAndUnbounded(topical, _)),
batched.whetherCommits,
)
}
Expand All @@ -319,7 +326,7 @@ object RecordStream {
): Resource[F, RecordStream[F, A]] =
batched
.to(topical, reset)
.map(whetherCommits.chunk(topical))
.map(whetherCommits.chunk(topical, _))
}

sealed trait ConfigStage1 {
Expand Down Expand Up @@ -544,7 +551,8 @@ object RecordStream {
.evalMap(parseBatch(topical))
val unbounded: NeedsConsumer[F, P[F, IncomingRecords[A]]] =
c => whetherCommits.extrude(recordStream(c, topical))
StreamSelector.Impl(history, unbounded, whetherCommits)
val hAndU = history.product(unbounded).map(hAndU => whetherCommits.historyAndUnbounded(hAndU._1, hAndU._2))
StreamSelector.Impl(hAndU, whetherCommits)
}

private def assign[F[_]: Monad, A, B](
Expand Down

0 comments on commit 0f24ffc

Please sign in to comment.