Skip to content

Commit

Permalink
Merge pull request #476 from Banno/qwer
Browse files Browse the repository at this point in the history
feat: configs seam for `RecordStream.Batched.assign` (-> `master`)
  • Loading branch information
Kazark authored Aug 19, 2021
2 parents 37349c3 + b1b6e1a commit 3619ad4
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions core/src/main/scala/com/banno/kafka/RecordStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -669,5 +669,54 @@ object RecordStream {
WhetherCommits.May(groupId),
)
)

def assign[F[_]: Async](
kafkaBootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
clientId: String,
configs: Map[String, AnyRef],
): Assigner[F, IncomingRecords, Stream] =
AssignerImpl(
BaseConfigs(
kafkaBootstrapServers,
schemaRegistryUri,
clientId,
WhetherCommits.No,
configs.toList: _*
)
)

def assign[F[_]: Async](
kafkaBootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
groupId: GroupId,
configs: Map[String, AnyRef],
): Assigner[F, IncomingRecords, RecordStream] =
AssignerImpl(
BaseConfigs(
kafkaBootstrapServers,
schemaRegistryUri,
groupId.id,
WhetherCommits.May(groupId),
configs.toList: _*
)
)

def assign[F[_]: Async](
kafkaBootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
clientId: String,
groupId: GroupId,
configs: Map[String, AnyRef],
): Assigner[F, IncomingRecords, RecordStream] =
AssignerImpl(
BaseConfigs(
kafkaBootstrapServers,
schemaRegistryUri,
clientId,
WhetherCommits.May(groupId),
configs.toList: _*
)
)
}
}

0 comments on commit 3619ad4

Please sign in to comment.