Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaker #266

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
90ff17b
AtomicCircularBuffer and skeleton on CircuitBreaker
Kamil-Lontkowski Jan 9, 2025
8447a0f
CircuitBreakerCountStateMachine
Kamil-Lontkowski Jan 9, 2025
9951f29
WIP
Kamil-Lontkowski Jan 10, 2025
ed9985c
breaker based on actor
Kamil-Lontkowski Jan 13, 2025
11d8005
Don't use atomics inside state machine
Kamil-Lontkowski Jan 14, 2025
72ab369
Delete out of date TODOs
Kamil-Lontkowski Jan 14, 2025
81ee70e
Refactor nextState to be pure
Kamil-Lontkowski Jan 15, 2025
5c94eda
fixes and tests for state machine
Kamil-Lontkowski Jan 16, 2025
0a26e2f
scaladoc, move logic to different files
Kamil-Lontkowski Jan 16, 2025
20ed3ff
Refactor nextState to object, use composition instead of inheritance
Kamil-Lontkowski Jan 20, 2025
c617c66
don't go through all results on every call
Kamil-Lontkowski Jan 20, 2025
64d694f
CircuiBreaker docs calculate rolling metrics, more tests
Kamil-Lontkowski Jan 20, 2025
f42e74d
added bigger time margin for test, track metrics per result
Kamil-Lontkowski Jan 20, 2025
6f376eb
use removeHeadWhile instead of filterInPlace
Kamil-Lontkowski Jan 20, 2025
3581bd9
introduce PercentageThreshold type
Kamil-Lontkowski Jan 20, 2025
55afda2
Try to fix tests
adamw Jan 21, 2025
6e34c70
Fix helper method conflict
adamw Jan 21, 2025
09cc330
Failing test
adamw Jan 22, 2025
5be4f24
fix edge case for last completed call in halfOpen state
Kamil-Lontkowski Jan 22, 2025
9139982
Add test case for wrong calculation of metrics
Kamil-Lontkowski Jan 22, 2025
24e70b7
Don't count metrics refistered with different state, fix test
Kamil-Lontkowski Jan 22, 2025
cf7dcea
docs
Kamil-Lontkowski Jan 22, 2025
4e01101
docs grammar fixes, better working example
Kamil-Lontkowski Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions core/src/main/scala/ox/resilience/CircuitBreaker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package ox.resilience

import scala.concurrent.duration.*
import ox.*
import java.util.concurrent.Semaphore
import ox.channels.Actor
import ox.channels.BufferCapacity
import ox.channels.ActorRef
import scala.util.Try

private[resilience] enum CircuitBreakerState:
case Open(since: Long)
case Closed(since: Long)
case HalfOpen(since: Long, semaphore: Semaphore, completedOperations: Int = 0)

private[resilience] enum CircuitBreakerResult:
case Success
case Failure
case Slow

private[resilience] case class Metrics(
failureRate: Int,
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
slowCallsRate: Int,
operationsInWindow: Int,
lastAcquisitionResult: Option[AcquireResult],
timestamp: Long
)

private[resilience] case class AcquireResult(acquired: Boolean, circuitState: CircuitBreakerState)

private case class CircuitBreakerStateMachineConfig(
failureRateThreshold: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we use PercentageThreshold here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really couldn't decide. This is internal config, so I would need to unwrap it everywhere where it's used.

slowCallThreshold: Int,
slowCallDurationThreshold: FiniteDuration,
minimumNumberOfCalls: Int,
numberOfCallsInHalfOpenState: Int,
waitDurationOpenState: FiniteDuration,
halfOpenTimeoutDuration: FiniteDuration
)
private object CircuitBreakerStateMachineConfig:
def fromConfig(c: CircuitBreakerConfig): CircuitBreakerStateMachineConfig =
CircuitBreakerStateMachineConfig(
failureRateThreshold = c.failureRateThreshold.toInt,
slowCallThreshold = c.slowCallThreshold.toInt,
slowCallDurationThreshold = c.slowCallDurationThreshold,
minimumNumberOfCalls = c.minimumNumberOfCalls,
numberOfCallsInHalfOpenState = c.numberOfCallsInHalfOpenState,
waitDurationOpenState = c.waitDurationOpenState,
halfOpenTimeoutDuration = c.halfOpenTimeoutDuration
)
end CircuitBreakerStateMachineConfig

/** Circuit Breaker. Operations can be dropped, when the breaker is open or if it doesn't take more operation in halfOpen state. The Circuit
* Breaker might calculate different metrics based on [[SlidingWindow]] provided in config. See [[SlidingWindow]] for more details.
*/
case class CircuitBreaker(config: CircuitBreakerConfig)(using Ox):
private[resilience] val stateMachine = CircuitBreakerStateMachine(config)
private val actorRef: ActorRef[CircuitBreakerStateMachine] = Actor.create(stateMachine)(using sc = BufferCapacity.apply(100))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the CB constructor could also accept a BufferCapacity parameter, to allow configuring how much operations to buffer; we could provide a default of 100 there


private def tryAcquire: AcquireResult = stateMachine.state match
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by convention, side-effecting parameterless operations should have (), so: tryAcquire(); but hasPermits would have no (), as it doesn't change the state

case currState @ CircuitBreakerState.Closed(_) => AcquireResult(true, currState)
case currState @ CircuitBreakerState.Open(_) => AcquireResult(false, currState)
case currState @ CircuitBreakerState.HalfOpen(_, semaphore, _) => AcquireResult(semaphore.tryAcquire(1), currState)

/** Runs the operation using the given error mode or drops it if the breaker is open.
* @param em
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param operation
* The operation to run.
* @return
* `Some` if the operation has been run, `None` if the operation has been dropped.
*/
def runOrDropWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(
operation: => F[T]
): Option[F[T]] =
val acquiredResult = tryAcquire
if acquiredResult.acquired then
val (duration, result) = timed(operation)
if em.isError(result) then
actorRef.tell(_.registerResult(CircuitBreakerResult.Failure, acquiredResult, actorRef))
Some(result)
else
if duration > config.slowCallDurationThreshold then
actorRef.tell(_.registerResult(CircuitBreakerResult.Slow, acquiredResult, actorRef))
else actorRef.tell(_.registerResult(CircuitBreakerResult.Success, acquiredResult, actorRef))
Some(result)
end if
else None
end if
end runOrDropWithErrorMode

/** Runs the operation returning [[scala.util.Either]] or drops it if the breaker is open. Note that any exceptions thrown by the
* operation aren't caught and are propagated to user.
*
* @param operation
* The operation to run.
* @return
* `Some` if the operation has been run, `None` if the operation has been dropped.
* @throws anything
* The exception thrown by operation.
*/
def runOrDropEither[E, T](
operation: => Either[E, T]
): Option[Either[E, T]] =
runOrDropWithErrorMode(EitherMode[E])(operation)

/** Runs the operation or drops it if the breaker is open returning a direct result wrapped in [[Option]]
*
* @param operation
* The operation to run.
* @return
* `Some` if the operation has been run, `None` if the operation has been dropped.
* @throws anything
* The exception thrown by operation.
*/
def runOrDrop[T](operation: => T): Option[T] =
runOrDropEither(Try(operation).toEither).map(_.fold(throw _, identity))
end CircuitBreaker
78 changes: 78 additions & 0 deletions core/src/main/scala/ox/resilience/CircuitBreakerConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package ox.resilience

import scala.concurrent.duration.*
import java.util.concurrent.TimeUnit

/** Allows to configure how [[Metrics]] will be calculated
*/
enum SlidingWindow:
/** Window counting last n operations when calculating metrics.
* @param windowSize
* number of last n results recored.
*/
case CountBased(windowSize: Int)

/** Window counting operations in the lapse of `duraiton` before current time.
* @param duration
* span of time where results are considered for including in metrics.
*/
case TimeBased(duration: FiniteDuration)
end SlidingWindow

/** Type representing percentage threshold between 0 and 100 */
opaque type PercentageThreshold = Int

extension (c: PercentageThreshold) def toInt: Int = c

object PercentageThreshold:
def apply(c: Int): PercentageThreshold =
assert(c >= 0 && c <= 100, s"PercentageThreshold must be between 0 and 100, value: $c")
c

/** @param failureRateThreshold
* threshold, as percentage of operations that ended in failure
* @param slowCallThreshold
* threshold, as percentage of operations that spanned more then [[slowCallDurationThreshold]].
* @param slowCallDurationThreshold
* time after which operation is considered slow.
* @param slidingWindow
* configures how thresholds will be calculated. See [[SlidingWindow]] for more details.
* @param minimumNumberOfCalls
* minimum number of results that must be registered before metrics are calculated.
* @param waitDurationOpenState
* how much time will pass before breaker will switch from open to half open state.
* @param halfOpenTimeoutDuration
* time out after which, if not enough calls where registered in half open state, breaker will go back to open state.
* @param numberOfCallsInHalfOpenState
* number of results that must be registered to calculate metrics and decide if breaker should go back to open state or close. This is
* also maximum number of operations that can be started in half open state.
*/
case class CircuitBreakerConfig(
failureRateThreshold: PercentageThreshold,
slowCallThreshold: PercentageThreshold,
slowCallDurationThreshold: FiniteDuration,
slidingWindow: SlidingWindow,
minimumNumberOfCalls: Int,
waitDurationOpenState: FiniteDuration,
halfOpenTimeoutDuration: FiniteDuration,
numberOfCallsInHalfOpenState: Int
):

assert(
numberOfCallsInHalfOpenState > 0,
s"numberOfCallsInHalfOpenState must be greater than 0, value: $numberOfCallsInHalfOpenState"
)
end CircuitBreakerConfig

object CircuitBreakerConfig:
def default: CircuitBreakerConfig = CircuitBreakerConfig(
failureRateThreshold = PercentageThreshold(50),
slowCallThreshold = PercentageThreshold(50),
slowCallDurationThreshold = 60.seconds,
slidingWindow = SlidingWindow.CountBased(100),
minimumNumberOfCalls = 20,
waitDurationOpenState = FiniteDuration(10, TimeUnit.SECONDS),
halfOpenTimeoutDuration = FiniteDuration(0, TimeUnit.MILLISECONDS),
numberOfCallsInHalfOpenState = 10
)
end CircuitBreakerConfig
Loading