Skip to content

Commit

Permalink
Added batching producer ops (#906)
Browse files Browse the repository at this point in the history
* Added several batched send, pipe and sink convenience methods to ProducerOps.
* Added tests
* Added scaladoc for new methods
* Updated docker-compose to docker compose

---------

Co-authored-by: Zach Cox <[email protected]>
  • Loading branch information
milessabin and zcox authored Aug 2, 2024
1 parent 23b0149 commit 221dc73
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:

- name: Start docker-compose
id: start-docker-compose
run: docker-compose up -d
run: docker compose up -d

- name: Check that workflows are up to date
run: sbt githubWorkflowCheck
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ThisBuild / githubWorkflowBuildPreamble := Seq(
WorkflowStep.Run(
id = Some("start-docker-compose"),
name = Some("Start docker-compose"),
commands = List("docker-compose up -d"),
commands = List("docker compose up -d"),
)
)
ThisBuild / tlSonatypeUseLegacyHost := true
Expand Down
183 changes: 183 additions & 0 deletions core/src/main/scala/com/banno/kafka/producer/ProducerOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.banno.kafka.producer

import cats.*
import cats.data.NonEmptyList
import cats.syntax.all.*
import fs2.*
import org.apache.kafka.common.*
Expand All @@ -35,15 +36,197 @@ case class ProducerOps[F[_], K, V](producer: ProducerApi[F, K, V]) {
)(implicit F: Applicative[F]): F[G[RecordMetadata]] =
records.traverse(producer.sendAsync)

/** Sends all of the possibly empty collection of records to the producer
* (synchronously), so the producer may batch them. After all records are
* sent, asynchronously waits for all acks.
*
* Returns the write metadatas, in order. Fails if any individual send or ack
* fails.
*
* This, and related batch write operations, allow the producer to perform
* its own batching, while semantically blocking until all writes have
* succeeded. It maximizes concurrency and producer batching, and also
* simplicity of usage.
*/
def sendBatch[G[_]: Traverse](
records: G[ProducerRecord[K, V]]
)(implicit F: Monad[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] = records.map(producer.send)
for {
acks <- sends.sequence
rms <- acks.sequence
} yield rms
}

/** Sends all of the non-empty collection of records to the producer
* (synchronously), so the producer may batch them. After all records are
* sent, asynchronously waits for all acks.
*
* Returns the write metadatas, in order. Fails if any individual send or ack
* fails.
*
* This, and related batch write operations, allow the producer to perform
* its own batching, while semantically blocking until all writes have
* succeeded. It maximizes concurrency and producer batching, and also
* simplicity of usage.
*/
def sendBatchNonEmpty[G[_]: NonEmptyTraverse](
records: G[ProducerRecord[K, V]]
)(implicit F: FlatMap[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] = records.map(producer.send)
for {
acks <- sends.nonEmptySequence
rms <- acks.nonEmptySequence
} yield rms
}

/** Sends all of the possibly empty collection of records to the producer
* (synchronously), so the producer may batch them. After all records are
* sent, asynchronously waits for all acks.
*
* Calls the `onSend` callback for each record, after it is sent.
*
* Returns the write metadatas, in order. Fails if any individual send or ack
* fails.
*
* This, and related batch write operations, allow the producer to perform
* its own batching, while semantically blocking until all writes have
* succeeded. It maximizes concurrency and producer batching, and also
* simplicity of usage.
*/
def sendBatchWithCallbacks[G[_]: Traverse](
records: G[ProducerRecord[K, V]],
onSend: ProducerRecord[K, V] => F[Unit],
)(implicit F: Monad[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] =
records.map(r => producer.send(r) <* onSend(r))
for {
acks <- sends.sequence
rms <- acks.sequence
} yield rms
}

/** Sends all of the non-empty collection of records to the producer
* (synchronously), so the producer may batch them. After all records are
* sent, asynchronously waits for all acks.
*
* Calls the `onSend` callback for each record, after it is sent.
*
* Returns the write metadatas, in order. Fails if any individual send or ack
* fails.
*
* This, and related batch write operations, allow the producer to perform
* its own batching, while semantically blocking until all writes have
* succeeded. It maximizes concurrency and producer batching, and also
* simplicity of usage.
*/
def sendBatchNonEmptyWithCallbacks[G[_]: NonEmptyTraverse](
records: G[ProducerRecord[K, V]],
onSend: ProducerRecord[K, V] => F[Unit],
)(implicit F: FlatMap[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] =
records.map(r => producer.send(r) <* onSend(r))
for {
acks <- sends.nonEmptySequence
rms <- acks.nonEmptySequence
} yield rms
}

/** Returns a Pipe which transforms a stream of records into a stream of
* record metadatas, by sending each record to the producer and waiting for
* the ack.
*/
def pipeAsync: Pipe[F, ProducerRecord[K, V], RecordMetadata] =
_.evalMap(producer.sendAsync)

/** Returns a Pipe which transforms a stream of possibly empty collections of
* records into a stream of record metadatas, by sending each collection to
* the producer as a batch and waiting for the ack.
*/
def pipeSendBatch[G[_]: Traverse](implicit
F: Monad[F]
): Pipe[F, G[ProducerRecord[K, V]], G[RecordMetadata]] =
_.evalMap(sendBatch[G])

/** Returns a Pipe which transforms a stream of non-empty collections of
* records into a stream of record metadatas, by sending each collection to
* the producer as a batch and waiting for the ack.
*/
def pipeSendBatchNonEmpty[G[_]: NonEmptyTraverse](implicit
F: FlatMap[F]
): Pipe[F, G[ProducerRecord[K, V]], G[RecordMetadata]] =
_.evalMap(sendBatchNonEmpty[G])

/** Returns a Pipe which transforms a stream of records into a stream of
* record metadatas, by using the stream's chunks as batches of records to
* send to the producer.
*/
def pipeSendBatchChunks(implicit
F: FlatMap[F]
): Pipe[F, ProducerRecord[K, V], RecordMetadata] =
s =>
pipeSendBatchNonEmpty[NonEmptyList](NonEmptyTraverse[NonEmptyList], F)(
s.chunks.map(_.toNel).unNone
).flatMap(nel => Stream.emits(nel.toList))

/** Returns a Pipe which transforms a stream of records into a stream of
* record metadatas, by calling chunkN on the stream, to create chunks of
* size `n`, and sending those chunks as batches to the producer.
*/
def pipeSendBatchChunkN(n: Int, allowFewer: Boolean = true)(implicit
F: Monad[F]
): Pipe[F, ProducerRecord[K, V], RecordMetadata] =
s =>
pipeSendBatch[Chunk](Traverse[Chunk], F)(s.chunkN(n, allowFewer))
.flatMap(Stream.chunk)

/** Returns a Pipe which transforms a stream of records by sending each record
* to the producer and waiting for the ack.
*/
def sink: Pipe[F, ProducerRecord[K, V], Unit] =
_.evalMap(producer.sendAndForget)

/** Returns a Pipe which transforms a stream of records by sending each record
* to the producer and waiting for the ack.
*/
def sinkAsync: Pipe[F, ProducerRecord[K, V], Unit] =
pipeAsync.apply(_).void

/** Returns a Pipe which transforms a stream of possibly empty collections of
* records by sending each collection to the producer as a batch and waiting
* for the ack.
*/
def sinkSendBatch[G[_]: Traverse](implicit
F: Monad[F]
): Pipe[F, G[ProducerRecord[K, V]], Unit] =
pipeSendBatch.apply(_).void

/** Returns a Pipe which transforms a stream of non-empty collections of
* records by sending each collection to the producer as a batch and waiting
* for the ack.
*/
def sinkSendBatchNonEmpty[G[_]: NonEmptyTraverse](implicit
F: FlatMap[F]
): Pipe[F, G[ProducerRecord[K, V]], Unit] =
pipeSendBatchNonEmpty.apply(_).void

/** Returns a Pipe which transforms a stream of records by using the stream's
* chunks as batches of records to send to the producer.
*/
def sinkSendBatchChunks(implicit
F: FlatMap[F]
): Pipe[F, ProducerRecord[K, V], Unit] =
pipeSendBatchChunks.apply(_).void

/** Returns a Pipe which transforms a stream of records by calling chunkN on
* the stream, to create chunks of size `n`, and sending those chunks as
* batches to the producer.
*/
def sinkSendBatchChunkN(n: Int, allowFewer: Boolean = true)(implicit
F: Monad[F]
): Pipe[F, ProducerRecord[K, V], Unit] =
pipeSendBatchChunkN(n, allowFewer).apply(_).void

def transaction[G[_]: Foldable](
records: G[ProducerRecord[K, V]]
)(implicit F: MonadError[F, Throwable]): F[Unit] =
Expand Down
Loading

0 comments on commit 221dc73

Please sign in to comment.