Skip to content

Commit

Permalink
Circuit breaker (#362)
Browse files Browse the repository at this point in the history
* circuit breaker: consecutive and experimental rate breaker

* route specific breaker settings

* WIP notes

* refactor the circuit breaker package

* optional individual ttl for each breaker

* test host and route specific settings

* breaker configuration from command line

* add documentation

* docs reviewed

* remove sorted access list

* log open and close events

* drop unused fields
  • Loading branch information
aryszka authored Jul 13, 2017
1 parent 6e629d1 commit a6f2131
Show file tree
Hide file tree
Showing 23 changed files with 2,843 additions and 89 deletions.
62 changes: 62 additions & 0 deletions circuit/binarysampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package circuit

// binarysampler contains a series of events as 0 or 1 values, e.g. errors or successes,
// within a limited, sliding window.
// count contains the actual number of events with the value of 1 within the window.
// it compresses the event storage by 64.
type binarySampler struct {
size int
filled int
frames []uint64
pad uint64
count int
}

func newBinarySampler(size int) *binarySampler {
if size <= 0 {
size = 1
}

return &binarySampler{
size: size,
pad: 64 - uint64(size)%64,
}
}

func highestSet(frame, pad uint64) bool {
return frame&(1<<(63-pad)) != 0
}

func shift(frames []uint64) {
highestFrame := len(frames) - 1
for i := highestFrame; i >= 0; i-- {
h := highestSet(frames[i], 0)
frames[i] = frames[i] << 1
if h && i < highestFrame {
frames[i+1] |= 1
}
}
}

func (s *binarySampler) tick(set bool) {
filled := s.filled == s.size

if filled && highestSet(s.frames[len(s.frames)-1], s.pad) {
s.count--
}

if !filled {
if len(s.frames) <= s.filled/64 {
s.frames = append(s.frames, 0)
}

s.filled++
}

shift(s.frames)

if set {
s.count++
s.frames[0] |= 1
}
}
77 changes: 77 additions & 0 deletions circuit/binarysampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package circuit

import "testing"

func TestBinarySampler(t *testing.T) {
expectCount := func(t *testing.T, s *binarySampler, c int) {
if s.count != c {
t.Errorf("unexpected count, got: %d, expected: %d", s.count, c)
}
}

t.Run("wrong init arg defaults to 1", func(t *testing.T) {
s := newBinarySampler(-3)
expectCount(t, s, 0)
s.tick(true)
expectCount(t, s, 1)
s.tick(true)
expectCount(t, s, 1)
})

t.Run("returns right count when not filled", func(t *testing.T) {
s := newBinarySampler(6)
s.tick(true)
s.tick(false)
s.tick(true)
expectCount(t, s, 2)
})

t.Run("returns right count after filled", func(t *testing.T) {
s := newBinarySampler(3)
s.tick(false)
s.tick(true)
s.tick(false)
s.tick(true)
expectCount(t, s, 2)
})

t.Run("shifts the reservoir when filled", func(t *testing.T) {
s := newBinarySampler(3)
s.tick(true)
s.tick(false)
s.tick(true)
s.tick(false)
expectCount(t, s, 1)
})

t.Run("shifts through multiple frames", func(t *testing.T) {
const size = 314
s := newBinarySampler(size)

for i := 0; i < size+size/2; i++ {
s.tick(true)
}

expectCount(t, s, size)
})

t.Run("uses the right 'amount of memory'", func(t *testing.T) {
const size = 314
s := newBinarySampler(size)
for i := 0; i < size+size/2; i++ {
s.tick(true)
}

expectedFrames := size / 64
if size%64 > 0 {
expectedFrames++
}

if len(s.frames) != expectedFrames {
t.Errorf(
"unexpected number of frames, got: %d, expected: %d",
len(s.frames), expectedFrames,
)
}
})
}
148 changes: 148 additions & 0 deletions circuit/breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package circuit

import (
"strconv"
"strings"
"time"
)

// BreakerType defines the type of the used breaker: consecutive, rate or disabled.
type BreakerType int

const (
BreakerNone BreakerType = iota
ConsecutiveFailures
FailureRate
BreakerDisabled
)

// BreakerSettings contains the settings for individual circuit breakers.
//
// See the package overview for the detailed merging/overriding rules of the settings and for the meaning of the
// individual fields.
type BreakerSettings struct {
Type BreakerType
Host string
Window, Failures int
Timeout time.Duration
HalfOpenRequests int
IdleTTL time.Duration
}

type breakerImplementation interface {
Allow() (func(bool), bool)
}

type voidBreaker struct{}

// Breaker represents a single circuit breaker for a particular set of settings.
//
// Use the Get() method of the Registry to request fully initialized breakers.
type Breaker struct {
settings BreakerSettings
ts time.Time
impl breakerImplementation
}

func (to BreakerSettings) mergeSettings(from BreakerSettings) BreakerSettings {
if to.Type == BreakerNone {
to.Type = from.Type

if from.Type == ConsecutiveFailures {
to.Failures = from.Failures
}

if from.Type == FailureRate {
to.Window = from.Window
to.Failures = from.Failures
}
}

if to.Timeout == 0 {
to.Timeout = from.Timeout
}

if to.HalfOpenRequests == 0 {
to.HalfOpenRequests = from.HalfOpenRequests
}

if to.IdleTTL == 0 {
to.IdleTTL = from.IdleTTL
}

return to
}

// String returns the string representation of a particular set of settings.
func (s BreakerSettings) String() string {
var ss []string

switch s.Type {
case ConsecutiveFailures:
ss = append(ss, "type=consecutive")
case FailureRate:
ss = append(ss, "type=rate")
case BreakerDisabled:
return "disabled"
default:
return "none"
}

if s.Host != "" {
ss = append(ss, "host="+s.Host)
}

if s.Type == FailureRate && s.Window > 0 {
ss = append(ss, "window="+strconv.Itoa(s.Window))
}

if s.Failures > 0 {
ss = append(ss, "failures="+strconv.Itoa(s.Failures))
}

if s.Timeout > 0 {
ss = append(ss, "timeout="+s.Timeout.String())
}

if s.HalfOpenRequests > 0 {
ss = append(ss, "half-open-requests="+strconv.Itoa(s.HalfOpenRequests))
}

if s.IdleTTL > 0 {
ss = append(ss, "idle-ttl="+s.IdleTTL.String())
}

return strings.Join(ss, ",")
}

func (b voidBreaker) Allow() (func(bool), bool) {
return func(bool) {}, true
}

func newBreaker(s BreakerSettings) *Breaker {
var impl breakerImplementation
switch s.Type {
case ConsecutiveFailures:
impl = newConsecutive(s)
case FailureRate:
impl = newRate(s)
default:
impl = voidBreaker{}
}

return &Breaker{
settings: s,
impl: impl,
}
}

// Allow returns true if the breaker is in the closed state and a callback function for reporting the outcome of
// the operation. The callback expects true values if the outcome of the request was successful. Allow may not
// return a callback function when the state is open.
func (b *Breaker) Allow() (func(bool), bool) {
return b.impl.Allow()
}

func (b *Breaker) idle(now time.Time) bool {
return now.Sub(b.ts) > b.settings.IdleTTL
}
Loading

0 comments on commit a6f2131

Please sign in to comment.