Skip to content

Commit

Permalink
added bigger time margin for test, track metrics per result
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil-Lontkowski committed Jan 20, 2025
1 parent 64d694f commit f42e74d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
54 changes: 41 additions & 13 deletions core/src/main/scala/ox/resilience/CircuitBreakerStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import ox.*
import java.util.concurrent.Semaphore
import ox.channels.ActorRef
import ox.resilience.CircuitBreakerStateMachine.nextState
import scala.compiletime.ops.double

private[resilience] case class CircuitBreakerStateMachine(
config: CircuitBreakerStateMachineConfig,
results: CircuitBreakerResults
private val results: CircuitBreakerResults
)(using val ox: Ox):
@volatile private var _state: CircuitBreakerState = CircuitBreakerState.Closed

Expand Down Expand Up @@ -115,16 +116,22 @@ private[resilience] object CircuitBreakerResults:
private var failedCalls = 0
private var successCalls = 0

private def clearResults: Unit =
results.clear()
slowCalls = 0
failedCalls = 0
successCalls = 0

def onStateChange(oldState: CircuitBreakerState, newState: CircuitBreakerState): Unit =
import CircuitBreakerState.*
// we have to match so we don't reset result when for example incrementing completed calls in halfopen state
(oldState, newState) match
case (Closed, Open(_) | HalfOpen(_, _, _)) =>
results.clear()
clearResults
case (HalfOpen(_, _, _), Open(_) | Closed) =>
results.clear()
clearResults
case (Open(_), Closed | HalfOpen(_, _, _)) =>
results.clear()
clearResults
case (_, _) => ()
end match
end onStateChange
Expand Down Expand Up @@ -159,14 +166,31 @@ private[resilience] object CircuitBreakerResults:

case class TimeWindowBased(windowDuration: FiniteDuration)(using ox: Ox) extends CircuitBreakerResults(using ox):
// holds timestamp of recored operation and result
private val queue = collection.mutable.Queue[(Long, CircuitBreakerResult)]()
private val results = collection.mutable.ArrayDeque[(Long, CircuitBreakerResult)]()
private var slowCalls = 0
private var failedCalls = 0
private var successCalls = 0

private def clearResults(): Unit =
results.clear()
slowCalls = 0
failedCalls = 0
successCalls = 0

def calculateMetrics(lastAcquisitionResult: Option[AcquireResult], timestamp: Long): Metrics =
// filter all entries that happend outside sliding window
val results = queue.filterInPlace((time, _) => timestamp > time + windowDuration.toMillis)
val numOfOperations = results.length
val failuresRate = ((results.count(_ == CircuitBreakerResult.Failure) / numOfOperations.toFloat) * 100).toInt
val slowRate = ((results.count(_ == CircuitBreakerResult.Slow) / numOfOperations.toFloat) * 100).toInt
val res = results.filterInPlace { (time, result) =>
val isOlder = timestamp > time + windowDuration.toMillis
if isOlder then
result match
case CircuitBreakerResult.Success => successCalls -= 1
case CircuitBreakerResult.Failure => failedCalls -= 1
case CircuitBreakerResult.Slow => slowCalls -= 1
isOlder
}
val numOfOperations = res.length
val failuresRate = ((failedCalls / numOfOperations.toFloat) * 100).toInt
val slowRate = ((slowCalls / numOfOperations.toFloat) * 100).toInt
Metrics(
failuresRate,
slowRate,
Expand All @@ -177,18 +201,22 @@ private[resilience] object CircuitBreakerResults:
end calculateMetrics

def updateResults(result: CircuitBreakerResult): Unit =
queue.addOne((System.currentTimeMillis(), result))
result match
case CircuitBreakerResult.Success => successCalls += 1
case CircuitBreakerResult.Failure => failedCalls += 1
case CircuitBreakerResult.Slow => slowCalls += 1
results.addOne((System.currentTimeMillis(), result))

def onStateChange(oldState: CircuitBreakerState, newState: CircuitBreakerState): Unit =
import CircuitBreakerState.*
// we have to match so we don't reset result when for example incrementing completed calls in halfopen state
(oldState, newState) match
case (Closed, Open(_) | HalfOpen(_, _, _)) =>
queue.clear()
clearResults()
case (HalfOpen(_, _, _), Open(_) | Closed) =>
queue.clear()
clearResults()
case (Open(_), Closed | HalfOpen(_, _, _)) =>
queue.clear()
clearResults()
case (_, _) => ()
end match
end onStateChange
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/ox/resilience/CircuitBreakerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,18 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit
minimumNumberOfCalls = numberOfOperations,
slidingWindow = SlidingWindow.CountBased(numberOfOperations),
numberOfCallsInHalfOpenState = 1,
waitDurationOpenState = 10.millis,
halfOpenTimeoutDuration = 1.second
waitDurationOpenState = 1.seconds,
halfOpenTimeoutDuration = 2.seconds
)
)
def f(): Either[String, String] =
Left("boom")

// when
val result1 = circuitBreaker.runOrDropEither(f()) // trigger swithc to open
sleep(100.millis) // wait for state to register, and for switch to half open
sleep(1500.millis) // wait for state to register, and for switch to half open
val state = circuitBreaker.stateMachine.state
sleep(1500.millis) // wait longer than half open timeout
sleep(2500.millis) // wait longer than half open timeout
val stateAfterWait = circuitBreaker.stateMachine.state

// then
Expand Down

0 comments on commit f42e74d

Please sign in to comment.