diff --git a/core/src/main/scala/ox/resilience/CircuitBreakerStateMachine.scala b/core/src/main/scala/ox/resilience/CircuitBreakerStateMachine.scala index 21590389..2fd03286 100644 --- a/core/src/main/scala/ox/resilience/CircuitBreakerStateMachine.scala +++ b/core/src/main/scala/ox/resilience/CircuitBreakerStateMachine.scala @@ -5,10 +5,11 @@ import ox.* import java.util.concurrent.Semaphore import ox.channels.ActorRef import ox.resilience.CircuitBreakerStateMachine.nextState +import scala.compiletime.ops.double private[resilience] case class CircuitBreakerStateMachine( config: CircuitBreakerStateMachineConfig, - results: CircuitBreakerResults + private val results: CircuitBreakerResults )(using val ox: Ox): @volatile private var _state: CircuitBreakerState = CircuitBreakerState.Closed @@ -115,16 +116,22 @@ private[resilience] object CircuitBreakerResults: private var failedCalls = 0 private var successCalls = 0 + private def clearResults: Unit = + results.clear() + slowCalls = 0 + failedCalls = 0 + successCalls = 0 + def onStateChange(oldState: CircuitBreakerState, newState: CircuitBreakerState): Unit = import CircuitBreakerState.* // we have to match so we don't reset result when for example incrementing completed calls in halfopen state (oldState, newState) match case (Closed, Open(_) | HalfOpen(_, _, _)) => - results.clear() + clearResults case (HalfOpen(_, _, _), Open(_) | Closed) => - results.clear() + clearResults case (Open(_), Closed | HalfOpen(_, _, _)) => - results.clear() + clearResults case (_, _) => () end match end onStateChange @@ -159,14 +166,31 @@ private[resilience] object CircuitBreakerResults: case class TimeWindowBased(windowDuration: FiniteDuration)(using ox: Ox) extends CircuitBreakerResults(using ox): // holds timestamp of recored operation and result - private val queue = collection.mutable.Queue[(Long, CircuitBreakerResult)]() + private val results = collection.mutable.ArrayDeque[(Long, CircuitBreakerResult)]() + private var slowCalls = 0 + private var failedCalls = 0 + private var successCalls = 0 + + private def clearResults(): Unit = + results.clear() + slowCalls = 0 + failedCalls = 0 + successCalls = 0 def calculateMetrics(lastAcquisitionResult: Option[AcquireResult], timestamp: Long): Metrics = // filter all entries that happend outside sliding window - val results = queue.filterInPlace((time, _) => timestamp > time + windowDuration.toMillis) - val numOfOperations = results.length - val failuresRate = ((results.count(_ == CircuitBreakerResult.Failure) / numOfOperations.toFloat) * 100).toInt - val slowRate = ((results.count(_ == CircuitBreakerResult.Slow) / numOfOperations.toFloat) * 100).toInt + val res = results.filterInPlace { (time, result) => + val isOlder = timestamp > time + windowDuration.toMillis + if isOlder then + result match + case CircuitBreakerResult.Success => successCalls -= 1 + case CircuitBreakerResult.Failure => failedCalls -= 1 + case CircuitBreakerResult.Slow => slowCalls -= 1 + isOlder + } + val numOfOperations = res.length + val failuresRate = ((failedCalls / numOfOperations.toFloat) * 100).toInt + val slowRate = ((slowCalls / numOfOperations.toFloat) * 100).toInt Metrics( failuresRate, slowRate, @@ -177,18 +201,22 @@ private[resilience] object CircuitBreakerResults: end calculateMetrics def updateResults(result: CircuitBreakerResult): Unit = - queue.addOne((System.currentTimeMillis(), result)) + result match + case CircuitBreakerResult.Success => successCalls += 1 + case CircuitBreakerResult.Failure => failedCalls += 1 + case CircuitBreakerResult.Slow => slowCalls += 1 + results.addOne((System.currentTimeMillis(), result)) def onStateChange(oldState: CircuitBreakerState, newState: CircuitBreakerState): Unit = import CircuitBreakerState.* // we have to match so we don't reset result when for example incrementing completed calls in halfopen state (oldState, newState) match case (Closed, Open(_) | HalfOpen(_, _, _)) => - queue.clear() + clearResults() case (HalfOpen(_, _, _), Open(_) | Closed) => - queue.clear() + clearResults() case (Open(_), Closed | HalfOpen(_, _, _)) => - queue.clear() + clearResults() case (_, _) => () end match end onStateChange diff --git a/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala b/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala index 854705af..ee12731f 100644 --- a/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala +++ b/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala @@ -133,8 +133,8 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit minimumNumberOfCalls = numberOfOperations, slidingWindow = SlidingWindow.CountBased(numberOfOperations), numberOfCallsInHalfOpenState = 1, - waitDurationOpenState = 10.millis, - halfOpenTimeoutDuration = 1.second + waitDurationOpenState = 1.seconds, + halfOpenTimeoutDuration = 2.seconds ) ) def f(): Either[String, String] = @@ -142,9 +142,9 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit // when val result1 = circuitBreaker.runOrDropEither(f()) // trigger swithc to open - sleep(100.millis) // wait for state to register, and for switch to half open + sleep(1500.millis) // wait for state to register, and for switch to half open val state = circuitBreaker.stateMachine.state - sleep(1500.millis) // wait longer than half open timeout + sleep(2500.millis) // wait longer than half open timeout val stateAfterWait = circuitBreaker.stateMachine.state // then