diff --git a/circuit/binarysampler.go b/circuit/binarysampler.go new file mode 100644 index 0000000000..7c040211c2 --- /dev/null +++ b/circuit/binarysampler.go @@ -0,0 +1,62 @@ +package circuit + +// binarysampler contains a series of events as 0 or 1 values, e.g. errors or successes, +// within a limited, sliding window. +// count contains the actual number of events with the value of 1 within the window. +// it compresses the event storage by 64. +type binarySampler struct { + size int + filled int + frames []uint64 + pad uint64 + count int +} + +func newBinarySampler(size int) *binarySampler { + if size <= 0 { + size = 1 + } + + return &binarySampler{ + size: size, + pad: 64 - uint64(size)%64, + } +} + +func highestSet(frame, pad uint64) bool { + return frame&(1<<(63-pad)) != 0 +} + +func shift(frames []uint64) { + highestFrame := len(frames) - 1 + for i := highestFrame; i >= 0; i-- { + h := highestSet(frames[i], 0) + frames[i] = frames[i] << 1 + if h && i < highestFrame { + frames[i+1] |= 1 + } + } +} + +func (s *binarySampler) tick(set bool) { + filled := s.filled == s.size + + if filled && highestSet(s.frames[len(s.frames)-1], s.pad) { + s.count-- + } + + if !filled { + if len(s.frames) <= s.filled/64 { + s.frames = append(s.frames, 0) + } + + s.filled++ + } + + shift(s.frames) + + if set { + s.count++ + s.frames[0] |= 1 + } +} diff --git a/circuit/binarysampler_test.go b/circuit/binarysampler_test.go new file mode 100644 index 0000000000..1474632db6 --- /dev/null +++ b/circuit/binarysampler_test.go @@ -0,0 +1,77 @@ +package circuit + +import "testing" + +func TestBinarySampler(t *testing.T) { + expectCount := func(t *testing.T, s *binarySampler, c int) { + if s.count != c { + t.Errorf("unexpected count, got: %d, expected: %d", s.count, c) + } + } + + t.Run("wrong init arg defaults to 1", func(t *testing.T) { + s := newBinarySampler(-3) + expectCount(t, s, 0) + s.tick(true) + expectCount(t, s, 1) + s.tick(true) + expectCount(t, s, 1) + }) + + t.Run("returns right count when not filled", func(t *testing.T) { + s := newBinarySampler(6) + s.tick(true) + s.tick(false) + s.tick(true) + expectCount(t, s, 2) + }) + + t.Run("returns right count after filled", func(t *testing.T) { + s := newBinarySampler(3) + s.tick(false) + s.tick(true) + s.tick(false) + s.tick(true) + expectCount(t, s, 2) + }) + + t.Run("shifts the reservoir when filled", func(t *testing.T) { + s := newBinarySampler(3) + s.tick(true) + s.tick(false) + s.tick(true) + s.tick(false) + expectCount(t, s, 1) + }) + + t.Run("shifts through multiple frames", func(t *testing.T) { + const size = 314 + s := newBinarySampler(size) + + for i := 0; i < size+size/2; i++ { + s.tick(true) + } + + expectCount(t, s, size) + }) + + t.Run("uses the right 'amount of memory'", func(t *testing.T) { + const size = 314 + s := newBinarySampler(size) + for i := 0; i < size+size/2; i++ { + s.tick(true) + } + + expectedFrames := size / 64 + if size%64 > 0 { + expectedFrames++ + } + + if len(s.frames) != expectedFrames { + t.Errorf( + "unexpected number of frames, got: %d, expected: %d", + len(s.frames), expectedFrames, + ) + } + }) +} diff --git a/circuit/breaker.go b/circuit/breaker.go new file mode 100644 index 0000000000..1136c16bee --- /dev/null +++ b/circuit/breaker.go @@ -0,0 +1,148 @@ +package circuit + +import ( + "strconv" + "strings" + "time" +) + +// BreakerType defines the type of the used breaker: consecutive, rate or disabled. +type BreakerType int + +const ( + BreakerNone BreakerType = iota + ConsecutiveFailures + FailureRate + BreakerDisabled +) + +// BreakerSettings contains the settings for individual circuit breakers. +// +// See the package overview for the detailed merging/overriding rules of the settings and for the meaning of the +// individual fields. +type BreakerSettings struct { + Type BreakerType + Host string + Window, Failures int + Timeout time.Duration + HalfOpenRequests int + IdleTTL time.Duration +} + +type breakerImplementation interface { + Allow() (func(bool), bool) +} + +type voidBreaker struct{} + +// Breaker represents a single circuit breaker for a particular set of settings. +// +// Use the Get() method of the Registry to request fully initialized breakers. +type Breaker struct { + settings BreakerSettings + ts time.Time + impl breakerImplementation +} + +func (to BreakerSettings) mergeSettings(from BreakerSettings) BreakerSettings { + if to.Type == BreakerNone { + to.Type = from.Type + + if from.Type == ConsecutiveFailures { + to.Failures = from.Failures + } + + if from.Type == FailureRate { + to.Window = from.Window + to.Failures = from.Failures + } + } + + if to.Timeout == 0 { + to.Timeout = from.Timeout + } + + if to.HalfOpenRequests == 0 { + to.HalfOpenRequests = from.HalfOpenRequests + } + + if to.IdleTTL == 0 { + to.IdleTTL = from.IdleTTL + } + + return to +} + +// String returns the string representation of a particular set of settings. +func (s BreakerSettings) String() string { + var ss []string + + switch s.Type { + case ConsecutiveFailures: + ss = append(ss, "type=consecutive") + case FailureRate: + ss = append(ss, "type=rate") + case BreakerDisabled: + return "disabled" + default: + return "none" + } + + if s.Host != "" { + ss = append(ss, "host="+s.Host) + } + + if s.Type == FailureRate && s.Window > 0 { + ss = append(ss, "window="+strconv.Itoa(s.Window)) + } + + if s.Failures > 0 { + ss = append(ss, "failures="+strconv.Itoa(s.Failures)) + } + + if s.Timeout > 0 { + ss = append(ss, "timeout="+s.Timeout.String()) + } + + if s.HalfOpenRequests > 0 { + ss = append(ss, "half-open-requests="+strconv.Itoa(s.HalfOpenRequests)) + } + + if s.IdleTTL > 0 { + ss = append(ss, "idle-ttl="+s.IdleTTL.String()) + } + + return strings.Join(ss, ",") +} + +func (b voidBreaker) Allow() (func(bool), bool) { + return func(bool) {}, true +} + +func newBreaker(s BreakerSettings) *Breaker { + var impl breakerImplementation + switch s.Type { + case ConsecutiveFailures: + impl = newConsecutive(s) + case FailureRate: + impl = newRate(s) + default: + impl = voidBreaker{} + } + + return &Breaker{ + settings: s, + impl: impl, + } +} + +// Allow returns true if the breaker is in the closed state and a callback function for reporting the outcome of +// the operation. The callback expects true values if the outcome of the request was successful. Allow may not +// return a callback function when the state is open. +func (b *Breaker) Allow() (func(bool), bool) { + return b.impl.Allow() +} + +func (b *Breaker) idle(now time.Time) bool { + return now.Sub(b.ts) > b.settings.IdleTTL +} diff --git a/circuit/breaker_test.go b/circuit/breaker_test.go new file mode 100644 index 0000000000..f8bc2693d0 --- /dev/null +++ b/circuit/breaker_test.go @@ -0,0 +1,205 @@ +package circuit + +import ( + "math/rand" + "testing" + "time" +) + +func times(n int, f func()) { + for n > 0 { + f() + n-- + } +} + +func createDone(t *testing.T, success bool, b *Breaker) func() { + return func() { + if t.Failed() { + return + } + + done, ok := b.Allow() + if !ok { + t.Error("breaker is unexpectedly open") + return + } + + done(success) + } +} + +func succeed(t *testing.T, b *Breaker) func() { return createDone(t, true, b) } +func fail(t *testing.T, b *Breaker) func() { return createDone(t, false, b) } +func failOnce(t *testing.T, b *Breaker) { fail(t, b)() } + +func checkClosed(t *testing.T, b *Breaker) { + if _, ok := b.Allow(); !ok { + t.Error("breaker is not closed") + } +} + +func checkOpen(t *testing.T, b *Breaker) { + if _, ok := b.Allow(); ok { + t.Error("breaker is not open") + } +} + +func TestConsecutiveFailures(t *testing.T) { + s := BreakerSettings{ + Type: ConsecutiveFailures, + Failures: 3, + HalfOpenRequests: 3, + Timeout: 3 * time.Millisecond, + } + + waitTimeout := func() { + time.Sleep(s.Timeout) + } + + t.Run("new breaker closed", func(t *testing.T) { + b := newBreaker(s) + checkClosed(t, b) + }) + + t.Run("does not open on not enough failures", func(t *testing.T) { + b := newBreaker(s) + times(s.Failures-1, fail(t, b)) + checkClosed(t, b) + }) + + t.Run("open on failures", func(t *testing.T) { + b := newBreaker(s) + times(s.Failures, fail(t, b)) + checkOpen(t, b) + }) + + t.Run("go half open, close after required successes", func(t *testing.T) { + b := newBreaker(s) + times(s.Failures, fail(t, b)) + waitTimeout() + times(s.HalfOpenRequests, succeed(t, b)) + checkClosed(t, b) + }) + + t.Run("go half open, reopen after a fail within the required successes", func(t *testing.T) { + b := newBreaker(s) + times(s.Failures, fail(t, b)) + waitTimeout() + times(s.HalfOpenRequests-1, succeed(t, b)) + failOnce(t, b) + checkOpen(t, b) + }) +} + +func TestRateBreaker(t *testing.T) { + s := BreakerSettings{ + Type: FailureRate, + Window: 6, + Failures: 3, + HalfOpenRequests: 3, + Timeout: 3 * time.Millisecond, + } + + t.Run("new breaker closed", func(t *testing.T) { + b := newBreaker(s) + checkClosed(t, b) + }) + + t.Run("doesn't open if failure count is not within a window", func(t *testing.T) { + b := newBreaker(s) + times(1, fail(t, b)) + times(2, succeed(t, b)) + checkClosed(t, b) + times(1, fail(t, b)) + times(2, succeed(t, b)) + checkClosed(t, b) + times(1, fail(t, b)) + times(2, succeed(t, b)) + checkClosed(t, b) + }) + + t.Run("opens on reaching the rate", func(t *testing.T) { + b := newBreaker(s) + times(s.Window, succeed(t, b)) + times(s.Failures, fail(t, b)) + checkOpen(t, b) + }) +} + +// no checks, used for race detector +func TestRateBreakerFuzzy(t *testing.T) { + if testing.Short() { + t.Skip() + } + + const ( + concurrentRequests = 64 + requestDuration = 6 * time.Microsecond + requestDelay = 6 * time.Microsecond + duration = 3 * time.Second + ) + + s := BreakerSettings{ + Type: FailureRate, + Window: 300, + Failures: 120, + HalfOpenRequests: 12, + Timeout: 3 * time.Millisecond, + } + + b := newBreaker(s) + + stop := make(chan struct{}) + + successChance := func() bool { + return rand.Intn(s.Window) > s.Failures + } + + runAgent := func() { + for { + select { + case <-stop: + default: + } + + done, ok := b.Allow() + time.Sleep(requestDuration) + if ok { + done(successChance()) + } + + time.Sleep(requestDelay) + } + } + + time.AfterFunc(duration, func() { + close(stop) + }) + + for i := 0; i < concurrentRequests; i++ { + go runAgent() + } + + <-stop +} + +func TestSettingsString(t *testing.T) { + s := BreakerSettings{ + Type: FailureRate, + Host: "www.example.org", + Failures: 30, + Window: 300, + Timeout: time.Minute, + HalfOpenRequests: 15, + IdleTTL: time.Hour, + } + + ss := s.String() + expect := "type=rate,host=www.example.org,window=300,failures=30,timeout=1m0s,half-open-requests=15,idle-ttl=1h0m0s" + if ss != expect { + t.Error("invalid breaker settings string") + t.Logf("got : %s", ss) + t.Logf("expected: %s", expect) + } +} diff --git a/circuit/consecutivebreaker.go b/circuit/consecutivebreaker.go new file mode 100644 index 0000000000..377265da33 --- /dev/null +++ b/circuit/consecutivebreaker.go @@ -0,0 +1,54 @@ +package circuit + +import ( + log "github.com/sirupsen/logrus" + "github.com/sony/gobreaker" +) + +type consecutiveBreaker struct { + settings BreakerSettings + open bool + gb *gobreaker.TwoStepCircuitBreaker +} + +func newConsecutive(s BreakerSettings) *consecutiveBreaker { + b := &consecutiveBreaker{ + settings: s, + } + + b.gb = gobreaker.NewTwoStepCircuitBreaker(gobreaker.Settings{ + Name: s.Host, + MaxRequests: uint32(s.HalfOpenRequests), + Timeout: s.Timeout, + ReadyToTrip: b.readyToTrip, + }) + + return b +} + +func (b *consecutiveBreaker) readyToTrip(c gobreaker.Counts) bool { + b.open = int(c.ConsecutiveFailures) >= b.settings.Failures + if b.open { + log.Infof("circuit breaker open: %v", b.settings) + } + + return b.open +} + +func (b *consecutiveBreaker) Allow() (func(bool), bool) { + done, err := b.gb.Allow() + + // this error can only indicate that the breaker is not closed + closed := err == nil + + if !closed { + return nil, false + } + + if b.open { + b.open = false + log.Infof("circuit breaker closed: %v", b.settings) + } + + return done, true +} diff --git a/circuit/doc.go b/circuit/doc.go new file mode 100644 index 0000000000..70c37a4965 --- /dev/null +++ b/circuit/doc.go @@ -0,0 +1,152 @@ +/* +Package circuit implements circuit breaker functionality for the proxy. + +It provides two types of circuit breakers: consecutive and failure rate based. The circuit breakers can be +configured either globally, based on hosts or individual routes. The registry ensures synchronized access to the +active breakers and the recycling of the idle ones. + +The circuit breakers are always assigned to backend hosts, so that the outcome of requests to one host never +affects the circuit breaker behavior of another host. Besides hosts, individual routes can have separate circuit +breakers, too. + +Breaker Type - Consecutive Failures + +This breaker opens when the proxy couldn't connect to a backend or received a >=500 status code at least N times +in a row. When open, the proxy returns 503 - Service Unavailable response during the breaker timeout. After this +timeout, the breaker goes into half-open state, in which it expects that M number of requests succeed. The +requests in the half-open state are accepted concurrently. If any of the requests during the half-open state +fails, the breaker goes back to open state. If all succeed, it goes to closed state again. + +Breaker Type - Failure Rate + +The "rate breaker" works similar to the "consecutive breaker", but instead of considering N consecutive failures +for going open, it maintains a sliding window of the last M events, both successes and failures, and opens only +when the number of failures reaches N within the window. This way the sliding window is not time based and +allows the same breaker characteristics for low and high rate traffic. + +Usage + +When imported as a package, the Registry can be used to hold the circuit breakers and their settings. On a +higher level, the circuit breaker settings can be simply passed to skipper as part of the skipper.Options +object, or, equivalently, defined as command line flags. + +The following command starts skipper with a global consecutive breaker that opens after 5 failures for any +backend host: + + skipper -breaker type=consecutive,failures=5 + +To set only the type of the breaker globally, and the rates individually for the hosts: + + skipper -breaker type=rate,timeout=3m,idleTTL=30m \ + -breaker host=foo.example.org,window=300,failures=30 \ + -breaker host=bar.example.org,window=120,failures=45 + +To enable circuit breakers only for specific hosts: + + skipper -breaker type=disabled \ + -breaker type=rate,host=foo.example.org,window=300,failures=30 + +To change (or set) the breaker configurations for an individual route and disable for another, in eskip: + + updates: Method("POST") && Host("foo.example.org") + -> consecutiveBreaker(9) + -> "https://foo.backend.net"; + + backendHealthcheck: Path("/healthcheck") + -> disableBreaker() + -> "https://foo.backend.net"; + +The breaker settings can be defined in the following levels: global, based on the backend host, based on +individual route settings. The values are merged in the same order, so the global settings serve as defaults for +the host settings, and the result of the global and host settings serve as defaults for the route settings. +Setting global values happens the same way as setting host values, but leaving the Host field empty. Setting +route based values happens with filters in the route definitions. +(https://godoc.org/github.com/zalando/skipper/filters/circuit) + +Settings - Type + +It can be ConsecutiveFailures, FailureRate or Disabled, where the first two values select which breaker to use, +while the Disabled value can override a global or host configuration disabling the circuit breaker for the +specific host or route. + +Command line name: type. Possible command line values: consecutive, rate, disabled. + +Settings - Host + +The Host field indicates to which backend host should the current set of settings be applied. Leaving it empty +indicates global settings. + +Command line name: host. + +Settings - Window + +The window value sets the size of the sliding counter window of the failure rate breaker. + +Command line name: window. Possible command line values: any positive integer. + +Settings - Failures + +The failures value sets the max failure count for both the "consecutive" and "rate" breakers. + +Command line name: failures. Possible command line values: any positive integer. + +Settings - Timeout + +With the timeout we can set how long the breaker should stay open, before becoming half-open. + +Command line name: timeout. Possible command line values: any positive integer as milliseconds or a duration +string, e.g. 15m30s. + +Settings - Half-Open Requests + +Defines the number of requests expected to succeed while the circuit breaker is in the half-open state. + +Command line name: half-open-requests. Possible command line values: any positive integer. + +Settings - Idle TTL + +Defines the idle timeout after which a circuit breaker gets recycled, if it hasn't been used. + +Command line name: idle-ttl. Possible command line values: any positive integer as milliseconds or a duration +string, e.g. 15m30s. + +Filters + +The following circuit breaker filters are supported: consecutiveBreaker(), rateBreaker() and disableBreaker(). + +The consecutiveBreaker filter expects one mandatory parameter: the number of consecutive failures to open. It +accepts the following optional arguments: timeout, half-open requests, idle-ttl. + + consecutiveBreaker(5, "1m", 12, "30m") + +The rateBreaker filter expects two mandatory parameters: the number of consecutive failures to open and the size +of the sliding window. It accepts the following optional arguments: timeout, half-open requests, idle-ttl. + + rateBreaker(30, 300, "1m", 12, "30m") + +The disableBreaker filter doesn't expect any arguments, and it disables the circuit breaker, if any, for the +route that it appears in. + + disableBreaker() + +Proxy Usage + +The proxy, when circuit breakers are configured, uses them for backend connections. It checks the breaker for +the current backend host if it's closed before making backend requests. It reports the outcome of the request to +the breaker, considering connection failures and backend responses with status code >=500 as failures. When the +breaker is open, the proxy doesn't try to make backend requests, and returns a response with a status code of +503 and appending a header to the response: + + X-Circuit-Open: true + +Registry + +The active circuit breakers are stored in a registry. They are created on-demand, for the requested settings. +The registry synchronizes access to the shared circuit breakers. When the registry detects that a circuit +breaker is idle, it resets it, this way avoiding that an old series of failures would cause the circuit breaker +go open after an unreasonably low number of recent failures. The registry also makes sure to cleanup idle +circuit breakers that are not requested anymore by the proxy. This happens in a passive way, whenever a new +circuit breaker is created. The cleanup prevents storing circuit breakers for inaccessible backend hosts +infinitely in those scenarios where the route configuration is continuously changing. +*/ +package circuit diff --git a/circuit/ratebreaker.go b/circuit/ratebreaker.go new file mode 100644 index 0000000000..321d399b69 --- /dev/null +++ b/circuit/ratebreaker.go @@ -0,0 +1,87 @@ +package circuit + +import ( + log "github.com/sirupsen/logrus" + "sync" + + "github.com/sony/gobreaker" +) + +// TODO: +// in case of the rate breaker, there are unnecessary synchronization steps due to the 3rd party gobreaker. If +// the sliding window was part of the implementation of the individual breakers, this additional syncrhonization +// would not be required. + +type rateBreaker struct { + settings BreakerSettings + open bool + mx *sync.Mutex + sampler *binarySampler + gb *gobreaker.TwoStepCircuitBreaker +} + +func newRate(s BreakerSettings) *rateBreaker { + b := &rateBreaker{ + settings: s, + mx: &sync.Mutex{}, + } + + b.gb = gobreaker.NewTwoStepCircuitBreaker(gobreaker.Settings{ + Name: s.Host, + MaxRequests: uint32(s.HalfOpenRequests), + Timeout: s.Timeout, + ReadyToTrip: func(gobreaker.Counts) bool { return b.readyToTrip() }, + }) + + return b +} + +func (b *rateBreaker) readyToTrip() bool { + b.mx.Lock() + defer b.mx.Unlock() + + if b.sampler == nil { + return false + } + + b.open = b.sampler.count >= b.settings.Failures + if b.open { + log.Infof("circuit breaker open: %v", b.settings) + b.sampler = nil + } + + return b.open +} + +// count the failures in closed and half-open state +func (b *rateBreaker) countRate(success bool) { + b.mx.Lock() + defer b.mx.Unlock() + + if b.sampler == nil { + b.sampler = newBinarySampler(b.settings.Window) + } + + b.sampler.tick(!success) +} + +func (b *rateBreaker) Allow() (func(bool), bool) { + done, err := b.gb.Allow() + + // this error can only indicate that the breaker is not closed + closed := err == nil + + if !closed { + return nil, false + } + + if b.open { + b.open = false + log.Infof("circuit breaker closed: %v", b.settings) + } + + return func(success bool) { + b.countRate(success) + done(success) + }, true +} diff --git a/circuit/registry.go b/circuit/registry.go new file mode 100644 index 0000000000..d403e31e54 --- /dev/null +++ b/circuit/registry.go @@ -0,0 +1,115 @@ +package circuit + +import ( + "sync" + "time" +) + +const DefaultIdleTTL = time.Hour + +// Registry objects hold the active circuit breakers, ensure synchronized access to them, apply default settings +// and recycle the idle breakers. +type Registry struct { + defaults BreakerSettings + hostSettings map[string]BreakerSettings + lookup map[BreakerSettings]*Breaker + mx *sync.Mutex +} + +// NewRegistry initializes a registry with the provided default settings. Settings with an empty Host field are +// considered as defaults. Settings with the same Host field are merged together. +func NewRegistry(settings ...BreakerSettings) *Registry { + var ( + defaults BreakerSettings + hostSettings []BreakerSettings + ) + + for _, s := range settings { + if s.Host == "" { + defaults = defaults.mergeSettings(s) + continue + } + + hostSettings = append(hostSettings, s) + } + + if defaults.IdleTTL <= 0 { + defaults.IdleTTL = DefaultIdleTTL + } + + hs := make(map[string]BreakerSettings) + for _, s := range hostSettings { + if sh, ok := hs[s.Host]; ok { + hs[s.Host] = s.mergeSettings(sh) + } else { + hs[s.Host] = s.mergeSettings(defaults) + } + } + + return &Registry{ + defaults: defaults, + hostSettings: hs, + lookup: make(map[BreakerSettings]*Breaker), + mx: &sync.Mutex{}, + } +} + +func (r *Registry) mergeDefaults(s BreakerSettings) BreakerSettings { + defaults, ok := r.hostSettings[s.Host] + if !ok { + defaults = r.defaults + } + + return s.mergeSettings(defaults) +} + +func (r *Registry) dropIdle(now time.Time) { + for h, b := range r.lookup { + if b.idle(now) { + delete(r.lookup, h) + } + } +} + +func (r *Registry) get(s BreakerSettings) *Breaker { + r.mx.Lock() + defer r.mx.Unlock() + + now := time.Now() + + b, ok := r.lookup[s] + if !ok || b.idle(now) { + // check if there is any other to evict, evict if yes + r.dropIdle(now) + + // create a new one + b = newBreaker(s) + r.lookup[s] = b + } + + // set the access timestamp + b.ts = now + + return b +} + +// Get returns a circuit breaker for the provided settings. The BreakerSettings object is used here as a key, +// but typically it is enough to just set its Host field: +// +// r.Get(BreakerSettings{Host: backendHost}) +// +// The key will be filled up with the defaults and the matching circuit breaker will be returned if it exists, +// or a new one will be created if not. +func (r *Registry) Get(s BreakerSettings) *Breaker { + // we check for host, because we don't want to use shared global breakers + if s.Type == BreakerDisabled || s.Host == "" { + return nil + } + + s = r.mergeDefaults(s) + if s.Type == BreakerNone { + return nil + } + + return r.get(s) +} diff --git a/circuit/registry_test.go b/circuit/registry_test.go new file mode 100644 index 0000000000..d2888aa335 --- /dev/null +++ b/circuit/registry_test.go @@ -0,0 +1,471 @@ +package circuit + +import ( + "math/rand" + "testing" + "time" +) + +// no checks, used for race detector +func TestRegistry(t *testing.T) { + createSettings := func(cf int) BreakerSettings { + return BreakerSettings{ + Type: ConsecutiveFailures, + Failures: cf, + IdleTTL: DefaultIdleTTL, + } + } + + createHostSettings := func(h string, cf int) BreakerSettings { + s := createSettings(cf) + s.Host = h + return s + } + + createDisabledSettings := func() BreakerSettings { + return BreakerSettings{Type: BreakerDisabled} + } + + checkNil := func(t *testing.T, b *Breaker) { + if b != nil { + t.Error("unexpected breaker") + } + } + + checkNotNil := func(t *testing.T, b *Breaker) { + if b == nil { + t.Error("failed to receive a breaker") + } + } + + checkSettings := func(t *testing.T, left, right BreakerSettings) { + if left != right { + t.Error("failed to receive breaker with the right settings") + t.Log(left) + t.Log(right) + } + } + + checkWithoutHost := func(t *testing.T, b *Breaker, s BreakerSettings) { + checkNotNil(t, b) + sb := b.settings + sb.Host = "" + checkSettings(t, sb, s) + } + + checkWithHost := func(t *testing.T, b *Breaker, s BreakerSettings) { + checkNotNil(t, b) + checkSettings(t, b.settings, s) + } + + t.Run("no settings", func(t *testing.T) { + r := NewRegistry() + + b := r.Get(BreakerSettings{Host: "foo"}) + checkNil(t, b) + }) + + t.Run("only default settings", func(t *testing.T) { + d := createSettings(5) + r := NewRegistry(d) + + b := r.Get(BreakerSettings{Host: "foo"}) + checkWithoutHost(t, b, r.defaults) + }) + + t.Run("only host settings", func(t *testing.T) { + h0 := createHostSettings("foo", 5) + h1 := createHostSettings("bar", 5) + r := NewRegistry(h0, h1) + + b := r.Get(BreakerSettings{Host: "foo"}) + checkWithHost(t, b, h0) + + b = r.Get(BreakerSettings{Host: "bar"}) + checkWithHost(t, b, h1) + + b = r.Get(BreakerSettings{Host: "baz"}) + checkNil(t, b) + }) + + t.Run("default and host settings", func(t *testing.T) { + d := createSettings(5) + h0 := createHostSettings("foo", 5) + h1 := createHostSettings("bar", 5) + r := NewRegistry(d, h0, h1) + + b := r.Get(BreakerSettings{Host: "foo"}) + checkWithHost(t, b, h0) + + b = r.Get(BreakerSettings{Host: "bar"}) + checkWithHost(t, b, h1) + + b = r.Get(BreakerSettings{Host: "baz"}) + checkWithoutHost(t, b, d) + }) + + t.Run("only custom settings", func(t *testing.T) { + r := NewRegistry() + + cs := createHostSettings("foo", 15) + b := r.Get(cs) + checkWithHost(t, b, cs) + }) + + t.Run("only default settings, with custom", func(t *testing.T) { + d := createSettings(5) + r := NewRegistry(d) + + cs := createHostSettings("foo", 15) + b := r.Get(cs) + checkWithHost(t, b, cs) + }) + + t.Run("only host settings, with custom", func(t *testing.T) { + h0 := createHostSettings("foo", 5) + h1 := createHostSettings("bar", 5) + r := NewRegistry(h0, h1) + + cs := createHostSettings("foo", 15) + b := r.Get(cs) + checkWithHost(t, b, cs) + + cs = createHostSettings("bar", 15) + b = r.Get(cs) + checkWithHost(t, b, cs) + + cs = createHostSettings("baz", 15) + b = r.Get(cs) + checkWithHost(t, b, cs) + }) + + t.Run("default and host settings, with custom", func(t *testing.T) { + d := createSettings(5) + h0 := createHostSettings("foo", 5) + h1 := createHostSettings("bar", 5) + r := NewRegistry(d, h0, h1) + + cs := createHostSettings("foo", 15) + b := r.Get(cs) + checkWithHost(t, b, cs) + + cs = createHostSettings("bar", 15) + b = r.Get(cs) + checkWithHost(t, b, cs) + + cs = createHostSettings("baz", 15) + b = r.Get(cs) + checkWithHost(t, b, cs) + }) + + t.Run("no settings and disabled", func(t *testing.T) { + r := NewRegistry() + + b := r.Get(createDisabledSettings()) + checkNil(t, b) + }) + + t.Run("only default settings, disabled", func(t *testing.T) { + d := createSettings(5) + r := NewRegistry(d) + + b := r.Get(createDisabledSettings()) + checkNil(t, b) + }) + + t.Run("only host settings, disabled", func(t *testing.T) { + h0 := createHostSettings("foo", 5) + h1 := createHostSettings("bar", 5) + r := NewRegistry(h0, h1) + + b := r.Get(createDisabledSettings()) + checkNil(t, b) + }) + + t.Run("default and host settings, disabled", func(t *testing.T) { + d := createSettings(5) + h0 := createHostSettings("foo", 5) + h1 := createHostSettings("bar", 5) + r := NewRegistry(d, h0, h1) + + b := r.Get(createDisabledSettings()) + checkNil(t, b) + }) +} + +func TestRegistryEvictIdle(t *testing.T) { + if testing.Short() { + t.Skip() + } + + settings := []BreakerSettings{{ + IdleTTL: 15 * time.Millisecond, + }, { + Host: "foo", + Type: ConsecutiveFailures, + Failures: 4, + }, { + Host: "bar", + Type: ConsecutiveFailures, + Failures: 5, + }, { + Host: "baz", + Type: ConsecutiveFailures, + Failures: 6, + }, { + Host: "qux", + Type: ConsecutiveFailures, + Failures: 7, + }} + toEvict := settings[3] + r := NewRegistry(settings...) + + get := func(host string) { + b := r.Get(BreakerSettings{Host: host}) + if b == nil { + t.Error("failed to retrieve breaker") + } + } + + get("foo") + get("bar") + get("baz") + + time.Sleep(2 * settings[0].IdleTTL / 3) + + get("foo") + get("bar") + + time.Sleep(2 * settings[0].IdleTTL / 3) + + get("qux") + + if len(r.lookup) != 3 || r.lookup[toEvict] != nil { + t.Error("failed to evict breaker from lookup") + return + } + + for s := range r.lookup { + if s.Host == "baz" { + t.Error("failed to evict idle breaker") + return + } + } +} + +func TestIndividualIdle(t *testing.T) { + if testing.Short() { + t.Skip() + } + + const ( + consecutiveFailures = 5 + idleTimeout = 15 * time.Millisecond + hostIdleTimeout = 6 * time.Millisecond + ) + + r := NewRegistry( + BreakerSettings{ + Type: ConsecutiveFailures, + Failures: consecutiveFailures, + IdleTTL: idleTimeout, + }, + BreakerSettings{ + Host: "foo", + IdleTTL: hostIdleTimeout, + }, + ) + + shouldBeClosed := func(t *testing.T, host string) func(bool) { + b := r.Get(BreakerSettings{Host: host}) + if b == nil { + t.Error("failed get breaker") + return nil + } + + done, ok := b.Allow() + if !ok { + t.Error("breaker unexpectedly open") + return nil + } + + return done + } + + fail := func(t *testing.T, host string) { + done := shouldBeClosed(t, host) + if done != nil { + done(false) + } + } + + mkfail := func(t *testing.T, host string) func() { + return func() { + fail(t, host) + } + } + + t.Run("default", func(t *testing.T) { + times(consecutiveFailures-1, mkfail(t, "bar")) + time.Sleep(idleTimeout) + fail(t, "bar") + shouldBeClosed(t, "bar") + }) + + t.Run("host", func(t *testing.T) { + times(consecutiveFailures-1, mkfail(t, "foo")) + time.Sleep(hostIdleTimeout) + fail(t, "foo") + shouldBeClosed(t, "foo") + }) +} + +func TestRegistryFuzzy(t *testing.T) { + if testing.Short() { + t.Skip() + } + + const ( + hostCount = 1200 + customSettingsCount = 120 + concurrentRequests = 2048 + requestDurationMean = 120 * time.Microsecond + requestDurationDeviation = 60 * time.Microsecond + idleTTL = time.Second + duration = 3 * time.Second + ) + + genHost := func() string { + const ( + minHostLength = 12 + maxHostLength = 36 + ) + + h := make([]byte, minHostLength+rand.Intn(maxHostLength-minHostLength)) + for i := range h { + h[i] = 'a' + byte(rand.Intn(int('z'+1-'a'))) + } + + return string(h) + } + + hosts := make([]string, hostCount) + for i := range hosts { + hosts[i] = genHost() + } + + settings := []BreakerSettings{{IdleTTL: idleTTL}} + + settingsMap := make(map[string]BreakerSettings) + for _, h := range hosts { + s := BreakerSettings{ + Host: h, + Type: ConsecutiveFailures, + Failures: 5, + IdleTTL: idleTTL, + } + settings = append(settings, s) + settingsMap[h] = s + } + + r := NewRegistry(settings...) + + // the first customSettingsCount hosts can have corresponding custom settings + customSettings := make(map[string]BreakerSettings) + for _, h := range hosts[:customSettingsCount] { + s := settingsMap[h] + s.Failures = 15 + s.IdleTTL = idleTTL + customSettings[h] = s + } + + var syncToken struct{} + sync := make(chan struct{}, 1) + sync <- syncToken + synced := func(f func()) { + t := <-sync + f() + sync <- t + } + + replaceHostSettings := func(settings map[string]BreakerSettings, old, nu string) { + if s, ok := settings[old]; ok { + delete(settings, old) + s.Host = nu + settings[nu] = s + } + } + + replaceHost := func() { + synced(func() { + i := rand.Intn(len(hosts)) + old := hosts[i] + nu := genHost() + hosts[i] = nu + replaceHostSettings(settingsMap, old, nu) + replaceHostSettings(customSettings, old, nu) + }) + } + + stop := make(chan struct{}) + + getSettings := func(useCustom bool) BreakerSettings { + var s BreakerSettings + synced(func() { + if useCustom { + s = customSettings[hosts[rand.Intn(customSettingsCount)]] + return + } + + s = settingsMap[hosts[rand.Intn(hostCount)]] + }) + + return s + } + + requestDuration := func() time.Duration { + mean := float64(requestDurationMean) + deviation := float64(requestDurationDeviation) + return time.Duration(rand.NormFloat64()*deviation + mean) + } + + makeRequest := func(useCustom bool) { + s := getSettings(useCustom) + b := r.Get(s) + if b.settings != s { + t.Error("invalid breaker received") + t.Log(b.settings, s) + close(stop) + } + + time.Sleep(requestDuration()) + } + + runAgent := func() { + for { + select { + case <-stop: + return + default: + } + + // 1% percent chance for getting a host replaced: + if rand.Intn(100) == 0 { + replaceHost() + } + + // 3% percent of the requests is custom: + makeRequest(rand.Intn(100) < 3) + } + } + + time.AfterFunc(duration, func() { + close(stop) + }) + + for i := 0; i < concurrentRequests; i++ { + go runAgent() + } + + <-stop +} diff --git a/cmd/skipper/breakerflags.go b/cmd/skipper/breakerflags.go new file mode 100644 index 0000000000..a087899f71 --- /dev/null +++ b/cmd/skipper/breakerflags.go @@ -0,0 +1,105 @@ +package main + +import ( + "errors" + "strconv" + "strings" + + "github.com/zalando/skipper/circuit" +) + +const breakerUsage = `set global or host specific circuit breakers, e.g. -breaker type=rate,host=www.example.org,window=300,failures=30 + possible breaker properties: + type: consecutive/rate/disabled (defaults to consecutive) + host: a host name that overrides the global for a host + failures: the number of failures for consecutive or rate breakers + window: the size of the sliding window for the rate breaker + timeout: duration string or milliseconds while the breaker stays open + half-open-requests: the number of requests in half-open state to succeed before getting closed again + idle-ttl: duration string or milliseconds after the breaker is considered idle and reset + (see also: https://godoc.org/github.com/zalando/skipper/circuit)` + +type breakerFlags []circuit.BreakerSettings + +var errInvalidBreakerConfig = errors.New("invalid breaker config") + +func (b *breakerFlags) String() string { + s := make([]string, len(*b)) + for i, bi := range *b { + s[i] = bi.String() + } + + return strings.Join(s, "\n") +} + +func (b *breakerFlags) Set(value string) error { + var s circuit.BreakerSettings + + vs := strings.Split(value, ",") + for _, vi := range vs { + kv := strings.Split(vi, "=") + if len(kv) != 2 { + return errInvalidBreakerConfig + } + + switch kv[0] { + case "type": + switch kv[1] { + case "consecutive": + s.Type = circuit.ConsecutiveFailures + case "rate": + s.Type = circuit.FailureRate + case "disabled": + s.Type = circuit.BreakerDisabled + default: + return errInvalidBreakerConfig + } + case "host": + s.Host = kv[1] + case "window": + i, err := strconv.Atoi(kv[1]) + if err != nil { + return err + } + + s.Window = i + case "failures": + i, err := strconv.Atoi(kv[1]) + if err != nil { + return err + } + + s.Failures = i + case "timeout": + d, err := parseDurationFlag(kv[1]) + if err != nil { + return err + } + + s.Timeout = d + case "half-open-requests": + i, err := strconv.Atoi(kv[1]) + if err != nil { + return err + } + + s.HalfOpenRequests = i + case "idle-ttl": + d, err := parseDurationFlag(kv[1]) + if err != nil { + return err + } + + s.IdleTTL = d + default: + return errInvalidBreakerConfig + } + } + + if s.Type == circuit.BreakerNone { + s.Type = circuit.ConsecutiveFailures + } + + *b = append(*b, s) + return nil +} diff --git a/cmd/skipper/main.go b/cmd/skipper/main.go index 3a71494d47..617d51c124 100644 --- a/cmd/skipper/main.go +++ b/cmd/skipper/main.go @@ -135,6 +135,7 @@ var ( experimentalUpgrade bool printVersion bool maxLoopbacks int + breakers breakerFlags ) func init() { @@ -182,6 +183,7 @@ func init() { flag.BoolVar(&experimentalUpgrade, "experimental-upgrade", defaultExperimentalUpgrade, experimentalUpgradeUsage) flag.BoolVar(&printVersion, "version", false, versionUsage) flag.IntVar(&maxLoopbacks, "max-loopbacks", proxy.DefaultMaxLoopbacks, maxLoopbacksUsage) + flag.Var(&breakers, "breaker", breakerUsage) flag.Parse() } @@ -268,6 +270,7 @@ func main() { BackendFlushInterval: backendFlushInterval, ExperimentalUpgrade: experimentalUpgrade, MaxLoopbacks: maxLoopbacks, + BreakerSettings: breakers, } if insecure { diff --git a/doc.go b/doc.go index b37eb8c584..790fcfdac5 100644 --- a/doc.go +++ b/doc.go @@ -6,7 +6,8 @@ Skipper works as an HTTP reverse proxy that is responsible for mapping incoming requests to multiple HTTP backend services, based on routes that are selected by the request attributes. At the same time, both the requests and the responses can be augmented by a filter chain that is -specifically defined for each route. +specifically defined for each route. Optionally, it can provide circuit +breaker mechanism individually for each backend host. Skipper can load and update the route definitions from multiple data sources without being restarted. @@ -190,6 +191,15 @@ Skipper can use additional data sources, provided by extensions. Sources must implement the DataClient interface in the routing package. +Circuit Breaker + +Skipper provides circuit breakers, configured either globally, based on +backend hosts or based on individual routes. It supports two types of +circuit breaker behavior: open on N consecutive failures, or open on N +failures out of M requests. For details, see: +https://godoc.org/github.com/zalando/skipper/circuit. + + Running Skipper Skipper can be started with the default executable command 'skipper', or diff --git a/eskip/doc.go b/eskip/doc.go index f70ce2f2bf..b242283025 100644 --- a/eskip/doc.go +++ b/eskip/doc.go @@ -12,14 +12,14 @@ expression prefixed by its id and a ':'. A routing table example: - catalog: Path("/*category") -> "https://catalog.example.org"; - productPage: Path("/products/:id") -> "https://products.example.org"; - userAccount: Path("/user/:id/*userpage") -> "https://users.example.org"; + catalog: Path("/*category") -> "https://catalog.example.org"; + productPage: Path("/products/:id") -> "https://products.example.org"; + userAccount: Path("/user/:id/*userpage") -> "https://users.example.org"; - // 404 - notfound: * -> - modPath(/.+/, "/notfound.html") -> static("/", "/var/www") -> - + // 404 + notfound: * -> + modPath(/.+/, "/notfound.html") -> static("/", "/var/www") -> + A route expression always contains a match expression and a backend expression, and it can contain optional filter expressions. The match @@ -28,9 +28,9 @@ filters take place between the matcher and the backend. A route expression example: - Path("/api/*resource") && Header("Accept", "application/json") -> - modPath("^/api", "") -> requestHeader("X-Type", "external") -> - "https://api.example.org" + Path("/api/*resource") && Header("Accept", "application/json") -> + modPath("^/api", "") -> requestHeader("X-Type", "external") -> + "https://api.example.org" Match Expressions - Predicates @@ -41,11 +41,11 @@ separated by '&&'. A match expression example: - Path("/api/*resource") && Header("Accept", "application/json") + Path("/api/*resource") && Header("Accept", "application/json") The following predicate expressions are recognized: - Path("/some/path") + Path("/some/path") The path predicate accepts a single argument, that can be a fixed path like "/some/path", or it can contain wildcards in place of one or more @@ -56,7 +56,7 @@ supports the glob standard, e.g. "/some/path/**" will work as expected. The arguments are available to the filters while processing the matched requests. - PathSubtree("/some/path") + PathSubtree("/some/path") The path subtree predicate behaves similar to the path predicate, but it matches the exact path in the definition and any sub path below it. @@ -65,37 +65,37 @@ name "*". If a free wildcard is appended to the definition, e.g. PathSubtree("/some/path/*rest"), the free wildcard name is used instead of "*". The simple wildcards behave similar to the Path predicate. - PathRegexp(/regular-expression/) + PathRegexp(/regular-expression/) The regexp path predicate accepts a regular expression as a single argument that needs to be matched by the request path. The regular expression can be surrounded by '/' or '"'. - Host(/host-regular-expression/) + Host(/host-regular-expression/) The host predicate accepts a regular expression as a single argument that needs to be matched by the host header in the request. - Method("HEAD") + Method("HEAD") The method predicate is used to match the http request method. - Header("Accept", "application/json") + Header("Accept", "application/json") The header predicate is used to match the http headers in the request. It accepts two arguments, the name of the header field and the exact header value to match. - HeaderRegexp("Accept", /\Wapplication\/json\W/) + HeaderRegexp("Accept", /\Wapplication\/json\W/) The header regexp predicate works similar to the header expression, but the value to be matched is a regular expression. - * + * Catch all predicate. - Any() + Any() Former, deprecated form of the catch all predicate. @@ -106,7 +106,7 @@ Eskip supports custom route matching predicates, that can be implemented in extensions. The notation of custom predicates is the same as of the built-in route matching expressions: - Foo(3.14, "bar") + Foo(3.14, "bar") During parsing, custom predicates may define any arbitrary list of arguments of types number, string or regular expression, and it is the @@ -125,42 +125,48 @@ filter. The arguments can be of type string ("a string"), number A filter example: - setResponseHeader("max-age", "86400") -> static("/", "/var/www/public") + setResponseHeader("max-age", "86400") -> static("/", "/var/www/public") The default Skipper implementation provides the following built-in filters: - setRequestHeader("header-name", "header-value") + setRequestHeader("header-name", "header-value") - setResponseHeader("header-name", "header-value") + setResponseHeader("header-name", "header-value") - appendRequestHeader("header-name", "header-value") + appendRequestHeader("header-name", "header-value") - appendResponseHeader("header-name", "header-value") + appendResponseHeader("header-name", "header-value") - dropRequestHeader("header-name") + dropRequestHeader("header-name") - dropResponseHeader("header-name") + dropResponseHeader("header-name") - modPath(/regular-expression/, "replacement") + modPath(/regular-expression/, "replacement") - setPath("/replacement") + setPath("/replacement") - redirectTo(302, "https://ui.example.org") + redirectTo(302, "https://ui.example.org") - flowId("reuse", 64) + flowId("reuse", 64) - healthcheck() + healthcheck() - static("/images", "/var/www/images") + static("/images", "/var/www/images") - stripQuery("true") + stripQuery("true") - preserveHost() + preserveHost() - status(418) + status(418) - tee("https://audit-logging.example.org") + tee("https://audit-logging.example.org") + + consecutiveBreaker() + + rateBreaker() + + disableBreaker() For details about the built-in filters, please, refer to the documentation of the skipper/filters package. Skipper is designed to be @@ -183,7 +189,7 @@ a loopback. A network endpoint address example: - "http://internal.example.org:9090" + "http://internal.example.org:9090" An endpoint address backend is surrounded by '"'. It contains the scheme and the hostname of the endpoint, and optionally the port number that is @@ -191,7 +197,7 @@ inferred from the scheme if not specified. A shunt backend: - + The shunt backend means that the route will not forward the request to a network endpoint, but the router will handle the request itself. By @@ -219,9 +225,9 @@ character. Example with comments: - // forwards to the API endpoint - route1: Path("/api") -> "https://api.example.org"; - route2: * -> // everything else 404 + // forwards to the API endpoint + route1: Path("/api") -> "https://api.example.org"; + route2: * -> // everything else 404 Regular expressions diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 1e236c3dba..b35169b3e4 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -6,6 +6,7 @@ package builtin import ( "github.com/zalando/skipper/filters" "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/filters/circuit" "github.com/zalando/skipper/filters/cookie" "github.com/zalando/skipper/filters/diag" "github.com/zalando/skipper/filters/flowid" @@ -83,6 +84,9 @@ func MakeRegistry() filters.Registry { cookie.NewRequestCookie(), cookie.NewResponseCookie(), cookie.NewJSCookie(), + circuit.NewConsecutiveBreaker(), + circuit.NewRateBreaker(), + circuit.NewDisableBreaker(), } { r.Register(s) } diff --git a/filters/circuit/breaker.go b/filters/circuit/breaker.go new file mode 100644 index 0000000000..3c54a3d1b5 --- /dev/null +++ b/filters/circuit/breaker.go @@ -0,0 +1,216 @@ +/* +Package circuit provides filters to control the circuit breaker settings on the route level. + +For detailed documentation of the circuit breakers, see https://godoc.org/github.com/zalando/skipper/circuit. +*/ +package circuit + +import ( + "time" + + "github.com/zalando/skipper/circuit" + "github.com/zalando/skipper/filters" +) + +const ( + ConsecutiveBreakerName = "consecutiveBreaker" + RateBreakerName = "rateBreaker" + DisableBreakerName = "disableBreaker" + RouteSettingsKey = "#circuitbreakersettings" +) + +type spec struct { + typ circuit.BreakerType +} + +type filter struct { + settings circuit.BreakerSettings +} + +func getIntArg(a interface{}) (int, error) { + if i, ok := a.(int); ok { + return i, nil + } + + if f, ok := a.(float64); ok { + return int(f), nil + } + + return 0, filters.ErrInvalidFilterParameters +} + +func getDurationArg(a interface{}) (time.Duration, error) { + if s, ok := a.(string); ok { + return time.ParseDuration(s) + } + + i, err := getIntArg(a) + return time.Duration(i) * time.Millisecond, err +} + +// NewConsecutiveBreaker creates a filter specification to instantiate consecutiveBreaker() filters. +// +// These filters set a breaker for the current route that open if the backend failures for the route reach a +// value of N, where N is a mandatory argument of the filter: +// +// consecutiveBreaker(15) +// +// The filter accepts the following optional arguments: timeout (milliseconds or duration string), +// half-open-requests (integer), idle-ttl (milliseconds or duration string). +func NewConsecutiveBreaker() filters.Spec { + return &spec{typ: circuit.ConsecutiveFailures} +} + +// NewRateBreaker creates a filter specification to instantiate rateBreaker() filters. +// +// These filters set a breaker for the current route that open if the backend failures for the route reach a +// value of N within a window of the last M requests, where N and M are mandatory arguments of the filter: +// +// rateBreaker(30, 300) +// +// The filter accepts the following optional arguments: timeout (milliseconds or duration string), +// half-open-requests (integer), idle-ttl (milliseconds or duration string). +func NewRateBreaker() filters.Spec { + return &spec{typ: circuit.FailureRate} +} + +// NewDisableBreaker disables the circuit breaker for a route. It doesn't accept any arguments. +func NewDisableBreaker() filters.Spec { + return &spec{} +} + +func (s *spec) Name() string { + switch s.typ { + case circuit.ConsecutiveFailures: + return ConsecutiveBreakerName + case circuit.FailureRate: + return RateBreakerName + default: + return DisableBreakerName + } +} + +func consecutiveFilter(args []interface{}) (filters.Filter, error) { + if len(args) == 0 || len(args) > 4 { + return nil, filters.ErrInvalidFilterParameters + } + + failures, err := getIntArg(args[0]) + if err != nil { + return nil, err + } + + var timeout time.Duration + if len(args) > 1 { + timeout, err = getDurationArg(args[1]) + if err != nil { + return nil, err + } + } + + var halfOpenRequests int + if len(args) > 2 { + halfOpenRequests, err = getIntArg(args[2]) + if err != nil { + return nil, err + } + } + + var idleTTL time.Duration + if len(args) > 3 { + idleTTL, err = getDurationArg(args[3]) + if err != nil { + return nil, err + } + } + + return &filter{ + settings: circuit.BreakerSettings{ + Type: circuit.ConsecutiveFailures, + Failures: failures, + Timeout: timeout, + HalfOpenRequests: halfOpenRequests, + IdleTTL: idleTTL, + }, + }, nil +} + +func rateFilter(args []interface{}) (filters.Filter, error) { + if len(args) < 2 || len(args) > 5 { + return nil, filters.ErrInvalidFilterParameters + } + + failures, err := getIntArg(args[0]) + if err != nil { + return nil, err + } + + window, err := getIntArg(args[1]) + if err != nil { + return nil, err + } + + var timeout time.Duration + if len(args) > 2 { + timeout, err = getDurationArg(args[2]) + if err != nil { + return nil, err + } + } + + var halfOpenRequests int + if len(args) > 3 { + halfOpenRequests, err = getIntArg(args[3]) + if err != nil { + return nil, err + } + } + + var idleTTL time.Duration + if len(args) > 4 { + idleTTL, err = getDurationArg(args[4]) + if err != nil { + return nil, err + } + } + + return &filter{ + settings: circuit.BreakerSettings{ + Type: circuit.FailureRate, + Failures: failures, + Window: window, + Timeout: timeout, + HalfOpenRequests: halfOpenRequests, + IdleTTL: idleTTL, + }, + }, nil +} + +func disableFilter(args []interface{}) (filters.Filter, error) { + if len(args) != 0 { + return nil, filters.ErrInvalidFilterParameters + } + + return &filter{ + settings: circuit.BreakerSettings{ + Type: circuit.BreakerDisabled, + }, + }, nil +} + +func (s *spec) CreateFilter(args []interface{}) (filters.Filter, error) { + switch s.typ { + case circuit.ConsecutiveFailures: + return consecutiveFilter(args) + case circuit.FailureRate: + return rateFilter(args) + default: + return disableFilter(args) + } +} + +func (f *filter) Request(ctx filters.FilterContext) { + ctx.StateBag()[RouteSettingsKey] = f.settings +} + +func (f *filter) Response(filters.FilterContext) {} diff --git a/filters/circuit/breaker_test.go b/filters/circuit/breaker_test.go new file mode 100644 index 0000000000..e8025256a3 --- /dev/null +++ b/filters/circuit/breaker_test.go @@ -0,0 +1,130 @@ +package circuit + +import ( + "testing" + "time" + + "github.com/zalando/skipper/circuit" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/filters/filtertest" +) + +func TestArgs(t *testing.T) { + test := func(s filters.Spec, fail bool, args ...interface{}) func(*testing.T) { + return func(t *testing.T) { + if _, err := s.CreateFilter(args); fail && err == nil { + t.Error("failed to fail") + } else if !fail && err != nil { + t.Error(err) + } + } + } + + testOK := func(s filters.Spec, args ...interface{}) func(*testing.T) { return test(s, false, args...) } + testErr := func(s filters.Spec, args ...interface{}) func(*testing.T) { return test(s, true, args...) } + + t.Run("consecutive", func(t *testing.T) { + s := NewConsecutiveBreaker() + t.Run("missing", testErr(s, nil)) + t.Run("too many", testErr(s, 6, "1m", 12, "30m", 42)) + t.Run("wrong failure count", testErr(s, "6", "1m", 12)) + t.Run("wrong timeout", testErr(s, 6, "foo", 12)) + t.Run("wrong half-open requests", testErr(s, 6, "1m", "foo")) + t.Run("only failure count", testOK(s, 6)) + t.Run("only failure count and timeout", testOK(s, 6, "1m")) + t.Run("full", testOK(s, 6, "1m", 12)) + t.Run("timeout as milliseconds", testOK(s, 6, 60000, 12)) + t.Run("with idle ttl", testOK(s, 6, 60000, 12, "30m")) + }) + + t.Run("rate", func(t *testing.T) { + s := NewRateBreaker() + t.Run("missing both", testErr(s, nil)) + t.Run("missing window", testErr(s, 30)) + t.Run("too many", testErr(s, 30, 300, "1m", 45, "30m", 42)) + t.Run("wrong failure count", testErr(s, "30", 300, "1m", 45)) + t.Run("wrong window", testErr(s, 30, "300", "1m", 45)) + t.Run("wrong timeout", testErr(s, 30, "300", "foo", 45)) + t.Run("wrong half-open requests", testErr(s, 30, "300", "1m", "foo")) + t.Run("only failures and window", testOK(s, 30, 300)) + t.Run("only failures, window and timeout", testOK(s, 30, 300, "1m")) + t.Run("full", testOK(s, 30, 300, "1m", 45)) + t.Run("timeout as milliseconds", testOK(s, 30, 300, 60000, 45)) + t.Run("with idle ttl", testOK(s, 30, 300, 60000, 12, "30m")) + }) + + t.Run("disable", func(t *testing.T) { + s := NewDisableBreaker() + t.Run("with args fail", testErr(s, 6)) + t.Run("no args, ok", testOK(s)) + }) +} + +func TestBreaker(t *testing.T) { + test := func( + s func() filters.Spec, + expect circuit.BreakerSettings, + args ...interface{}, + ) func(*testing.T) { + return func(t *testing.T) { + s := s() + + f, err := s.CreateFilter(args) + if err != nil { + t.Error(err) + } + + ctx := &filtertest.Context{ + FStateBag: make(map[string]interface{}), + } + + f.Request(ctx) + + settings, ok := ctx.StateBag()[RouteSettingsKey] + if !ok { + t.Error("failed to set the breaker settings") + } + + if settings != expect { + t.Error("invalid settings") + t.Log("got", settings) + t.Log("expected", expect) + } + } + } + + t.Run("consecutive breaker", test( + NewConsecutiveBreaker, + circuit.BreakerSettings{ + Type: circuit.ConsecutiveFailures, + Failures: 6, + Timeout: time.Minute, + HalfOpenRequests: 12, + }, + 6, + "1m", + 12, + )) + + t.Run("rate breaker", test( + NewRateBreaker, + circuit.BreakerSettings{ + Type: circuit.FailureRate, + Failures: 30, + Window: 300, + Timeout: time.Minute, + HalfOpenRequests: 12, + }, + 30, + 300, + "1m", + 12, + )) + + t.Run("disable breaker", test( + NewDisableBreaker, + circuit.BreakerSettings{ + Type: circuit.BreakerDisabled, + }, + )) +} diff --git a/proxy/breaker_test.go b/proxy/breaker_test.go new file mode 100644 index 0000000000..85136a966b --- /dev/null +++ b/proxy/breaker_test.go @@ -0,0 +1,621 @@ +package proxy_test + +import ( + "fmt" + "net/http" + "strings" + "testing" + "time" + + "github.com/zalando/skipper/circuit" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters/builtin" + circuitfilters "github.com/zalando/skipper/filters/circuit" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/proxy/proxytest" +) + +type breakerTestContext struct { + t *testing.T + proxy *proxytest.TestProxy + backends map[string]*failingBackend +} + +type scenarioStep func(*breakerTestContext) + +type breakerScenario struct { + title string + settings []circuit.BreakerSettings + filters map[string][]*eskip.Filter + steps []scenarioStep +} + +const ( + testConsecutiveFailureCount = 5 + testBreakerTimeout = 3 * time.Millisecond + testHalfOpenRequests = 3 + testRateWindow = 10 + testRateFailures = 4 + defaultHost = "default" +) + +func urlHostNerr(u string) string { + return strings.Split(u, "//")[1] +} + +func newBreakerProxy( + backends map[string]*failingBackend, + settings []circuit.BreakerSettings, + filters map[string][]*eskip.Filter, +) *proxytest.TestProxy { + params := proxy.Params{ + CloseIdleConnsPeriod: -1, + } + + // for testing, mapping the configured backend hosts to the random generated host + var r []*eskip.Route + if len(settings) > 0 { + for i := range settings { + b := backends[settings[i].Host] + if b == nil { + r = append(r, &eskip.Route{ + Id: defaultHost, + HostRegexps: []string{fmt.Sprintf("^%s$", defaultHost)}, + Filters: filters[defaultHost], + Backend: backends[defaultHost].url, + }) + } else { + r = append(r, &eskip.Route{ + Id: settings[i].Host, + HostRegexps: []string{fmt.Sprintf("^%s$", settings[i].Host)}, + Filters: filters[settings[i].Host], + Backend: backends[settings[i].Host].url, + }) + settings[i].Host = urlHostNerr(backends[settings[i].Host].url) + } + } + + params.CircuitBreakers = circuit.NewRegistry(settings...) + } else { + r = append(r, &eskip.Route{ + Backend: backends[defaultHost].url, + }) + } + + fr := builtin.MakeRegistry() + return proxytest.WithParams(fr, params, r...) +} + +func testBreaker(t *testing.T, s breakerScenario) { + backends := make(map[string]*failingBackend) + for _, si := range s.settings { + h := si.Host + if h == "" { + h = defaultHost + } + + backends[h] = newFailingBackend() + defer backends[h].close() + } + + if len(backends) == 0 { + backends[defaultHost] = newFailingBackend() + defer backends[defaultHost].close() + } + + p := newBreakerProxy(backends, s.settings, s.filters) + defer p.Close() + + steps := s.steps + c := &breakerTestContext{ + t: t, + proxy: p, + backends: backends, + } + + for !t.Failed() && len(steps) > 0 { + steps[0](c) + steps = steps[1:] + } +} + +func setBackendHostSucceed(c *breakerTestContext, host string) { + c.backends[host].succeed() +} + +func setBackendSucceed(c *breakerTestContext) { + setBackendHostSucceed(c, defaultHost) +} + +func setBackendFailForHost(c *breakerTestContext, host string) { + c.backends[host].fail() +} + +func setBackendHostFail(host string) scenarioStep { + return func(c *breakerTestContext) { + setBackendFailForHost(c, host) + } +} + +func setBackendFail(c *breakerTestContext) { + setBackendFailForHost(c, defaultHost) +} + +func setBackendHostDown(c *breakerTestContext, host string) { + c.backends[host].down() +} + +func setBackendDown(c *breakerTestContext) { + setBackendHostDown(c, defaultHost) +} + +func proxyRequestHost(c *breakerTestContext, host string) (*http.Response, error) { + req, err := http.NewRequest("GET", c.proxy.URL, nil) + if err != nil { + return nil, err + } + + req.Host = host + + rsp, err := (&http.Client{}).Do(req) + if err != nil { + return nil, err + } + + rsp.Body.Close() + return rsp, nil +} + +func proxyRequest(c *breakerTestContext) (*http.Response, error) { + return proxyRequestHost(c, defaultHost) +} + +func checkStatus(c *breakerTestContext, rsp *http.Response, expected int) { + if rsp.StatusCode != expected { + c.t.Errorf( + "wrong response status: %d instead of %d", + rsp.StatusCode, + expected, + ) + } +} + +func requestHostForStatus(c *breakerTestContext, host string, expectedStatus int) *http.Response { + rsp, err := proxyRequestHost(c, host) + if err != nil { + c.t.Error(err) + return nil + } + + checkStatus(c, rsp, expectedStatus) + return rsp +} + +func requestHost(host string, expectedStatus int) scenarioStep { + return func(c *breakerTestContext) { + requestHostForStatus(c, host, expectedStatus) + } +} + +func request(expectedStatus int) scenarioStep { + return func(c *breakerTestContext) { + requestHostForStatus(c, defaultHost, expectedStatus) + } +} + +func requestOpenForHost(c *breakerTestContext, host string) { + rsp := requestHostForStatus(c, host, 503) + if c.t.Failed() { + return + } + + if rsp.Header.Get("X-Circuit-Open") != "true" { + c.t.Error("failed to set circuit open header") + } +} + +func requestHostOpen(host string) scenarioStep { + return func(c *breakerTestContext) { + requestOpenForHost(c, host) + } +} + +func requestOpen(c *breakerTestContext) { + requestOpenForHost(c, defaultHost) +} + +func checkBackendForCounter(c *breakerTestContext, host string, count int) { + if c.backends[host].counter() != count { + c.t.Error("invalid number of requests on the backend") + } + + c.backends[host].resetCounter() +} + +func checkBackendHostCounter(host string, count int) scenarioStep { + return func(c *breakerTestContext) { + checkBackendForCounter(c, host, count) + } +} + +func checkBackendCounter(count int) scenarioStep { + return func(c *breakerTestContext) { + checkBackendForCounter(c, defaultHost, count) + } +} + +// as in scenario step N times +func times(n int, s ...scenarioStep) scenarioStep { + return func(c *breakerTestContext) { + for !c.t.Failed() && n > 0 { + for _, si := range s { + si(c) + } + + n-- + } + } +} + +func wait(d time.Duration) scenarioStep { + return func(*breakerTestContext) { + time.Sleep(d) + } +} + +func trace(msg string) scenarioStep { + return func(*breakerTestContext) { + println(msg) + } +} + +func TestBreakerConsecutive(t *testing.T) { + for _, s := range []breakerScenario{{ + title: "no breaker", + steps: []scenarioStep{ + request(200), + checkBackendCounter(1), + setBackendFail, + times(testConsecutiveFailureCount, request(500)), + checkBackendCounter(testConsecutiveFailureCount), + request(500), + checkBackendCounter(1), + }, + }, { + title: "open", + settings: []circuit.BreakerSettings{{ + Type: circuit.ConsecutiveFailures, + Failures: testConsecutiveFailureCount, + }}, + steps: []scenarioStep{ + request(200), + checkBackendCounter(1), + setBackendFail, + times(testConsecutiveFailureCount, request(500)), + checkBackendCounter(testConsecutiveFailureCount), + requestOpen, + // checkBackendCounter(0), + }, + }, { + title: "open, fail to close", + settings: []circuit.BreakerSettings{{ + Type: circuit.ConsecutiveFailures, + Failures: testConsecutiveFailureCount, + Timeout: testBreakerTimeout, + HalfOpenRequests: testHalfOpenRequests, + }}, + steps: []scenarioStep{ + request(200), + checkBackendCounter(1), + setBackendFail, + times(testConsecutiveFailureCount, request(500)), + checkBackendCounter(testConsecutiveFailureCount), + requestOpen, + checkBackendCounter(0), + wait(2 * testBreakerTimeout), + request(500), + checkBackendCounter(1), + requestOpen, + checkBackendCounter(0), + }, + }, { + title: "open, fixed during timeout", + settings: []circuit.BreakerSettings{{ + Type: circuit.ConsecutiveFailures, + Failures: testConsecutiveFailureCount, + Timeout: testBreakerTimeout, + HalfOpenRequests: testHalfOpenRequests, + }}, + steps: []scenarioStep{ + request(200), + checkBackendCounter(1), + setBackendFail, + times(testConsecutiveFailureCount, request(500)), + checkBackendCounter(testConsecutiveFailureCount), + requestOpen, + checkBackendCounter(0), + wait(2 * testBreakerTimeout), + setBackendSucceed, + times(testHalfOpenRequests+1, request(200)), + checkBackendCounter(testHalfOpenRequests + 1), + }, + }, { + title: "open, fail again during half open", + settings: []circuit.BreakerSettings{{ + Type: circuit.ConsecutiveFailures, + Failures: testConsecutiveFailureCount, + Timeout: testBreakerTimeout, + HalfOpenRequests: testHalfOpenRequests, + }}, + steps: []scenarioStep{ + request(200), + checkBackendCounter(1), + setBackendFail, + times(testConsecutiveFailureCount, request(500)), + checkBackendCounter(testConsecutiveFailureCount), + requestOpen, + checkBackendCounter(0), + wait(2 * testBreakerTimeout), + setBackendSucceed, + times(1, request(200)), + checkBackendCounter(1), + setBackendFail, + times(1, request(500)), + checkBackendCounter(1), + requestOpen, + checkBackendCounter(0), + }, + }, { + title: "open due to backend being down", + settings: []circuit.BreakerSettings{{ + Type: circuit.ConsecutiveFailures, + Failures: testConsecutiveFailureCount, + }}, + steps: []scenarioStep{ + request(200), + checkBackendCounter(1), + setBackendDown, + times(testConsecutiveFailureCount, request(503)), + checkBackendCounter(0), + requestOpen, + }, + }} { + t.Run(s.title, func(t *testing.T) { + testBreaker(t, s) + }) + } +} + +func TestBreakerRate(t *testing.T) { + for _, s := range []breakerScenario{{ + title: "open", + settings: []circuit.BreakerSettings{{ + Type: circuit.FailureRate, + Failures: testRateFailures, + Window: testRateWindow, + }}, + steps: []scenarioStep{ + times(testRateWindow, request(200)), + checkBackendCounter(testRateWindow), + setBackendFail, + times(testRateFailures, request(500)), + checkBackendCounter(testRateFailures), + requestOpen, + checkBackendCounter(0), + }, + }, { + title: "open, fail to close", + settings: []circuit.BreakerSettings{{ + Type: circuit.FailureRate, + Failures: testRateFailures, + Window: testRateWindow, + Timeout: testBreakerTimeout, + HalfOpenRequests: testHalfOpenRequests, + }}, + steps: []scenarioStep{ + times(testRateWindow, request(200)), + checkBackendCounter(testRateWindow), + setBackendFail, + times(testRateFailures, request(500)), + checkBackendCounter(testRateFailures), + requestOpen, + checkBackendCounter(0), + wait(2 * testBreakerTimeout), + request(500), + checkBackendCounter(1), + requestOpen, + checkBackendCounter(0), + }, + }, { + title: "open, fixed during timeout", + settings: []circuit.BreakerSettings{{ + Type: circuit.FailureRate, + Failures: testRateFailures, + Window: testRateWindow, + Timeout: testBreakerTimeout, + HalfOpenRequests: testHalfOpenRequests, + }}, + steps: []scenarioStep{ + times(testRateWindow, request(200)), + checkBackendCounter(testRateWindow), + setBackendFail, + times(testRateFailures, request(500)), + checkBackendCounter(testRateFailures), + requestOpen, + checkBackendCounter(0), + wait(2 * testBreakerTimeout), + setBackendSucceed, + times(testHalfOpenRequests+1, request(200)), + checkBackendCounter(testHalfOpenRequests + 1), + }, + }, { + title: "open, fail again during half open", + settings: []circuit.BreakerSettings{{ + Type: circuit.FailureRate, + Failures: testRateFailures, + Window: testRateWindow, + Timeout: testBreakerTimeout, + HalfOpenRequests: testHalfOpenRequests, + }}, + steps: []scenarioStep{ + times(testRateWindow, request(200)), + checkBackendCounter(testRateWindow), + setBackendFail, + times(testRateFailures, request(500)), + checkBackendCounter(testRateFailures), + requestOpen, + checkBackendCounter(0), + wait(2 * testBreakerTimeout), + setBackendSucceed, + times(1, request(200)), + checkBackendCounter(1), + setBackendFail, + times(1, request(500)), + checkBackendCounter(1), + requestOpen, + checkBackendCounter(0), + }, + }, { + title: "open due to backend being down", + settings: []circuit.BreakerSettings{{ + Type: circuit.FailureRate, + Failures: testRateFailures, + Window: testRateWindow, + }}, + steps: []scenarioStep{ + times(testRateWindow, request(200)), + checkBackendCounter(testRateWindow), + setBackendDown, + times(testRateFailures, request(503)), + checkBackendCounter(0), + requestOpen, + }, + }} { + t.Run(s.title, func(t *testing.T) { + testBreaker(t, s) + }) + } +} + +func TestBreakerMultipleHosts(t *testing.T) { + testBreaker(t, breakerScenario{ + settings: []circuit.BreakerSettings{{ + Type: circuit.FailureRate, + Failures: testRateFailures + 2, + Window: testRateWindow, + }, { + Host: "foo", + Type: circuit.BreakerDisabled, + }, { + Host: "bar", + Type: circuit.FailureRate, + Failures: testRateFailures, + Window: testRateWindow, + }}, + steps: []scenarioStep{ + times( + testRateWindow, + request(200), + requestHost("foo", 200), + requestHost("bar", 200), + ), + checkBackendCounter(testRateWindow), + checkBackendHostCounter("foo", testRateWindow), + checkBackendHostCounter("bar", testRateWindow), + setBackendFail, + trace("setting fail"), + setBackendHostFail("foo"), + setBackendHostFail("bar"), + times(testRateFailures, + request(500), + requestHost("foo", 500), + requestHost("bar", 500), + ), + checkBackendCounter(testRateFailures), + checkBackendHostCounter("foo", testRateFailures), + checkBackendHostCounter("bar", testRateFailures), + request(500), + requestHost("foo", 500), + requestHostOpen("bar"), + checkBackendCounter(1), + checkBackendHostCounter("foo", 1), + checkBackendHostCounter("bar", 0), + request(500), + requestHost("foo", 500), + checkBackendCounter(1), + checkBackendHostCounter("foo", 1), + requestOpen, + requestHost("foo", 500), + // checkBackendCounter(0), + checkBackendHostCounter("foo", 1), + }, + }) +} + +func TestBreakerMultipleHostsAndRouteBasedSettings(t *testing.T) { + testBreaker(t, breakerScenario{ + settings: []circuit.BreakerSettings{{ + Type: circuit.FailureRate, + Failures: testRateFailures + 2, + Window: testRateWindow, + }, { + Host: "foo", + Type: circuit.FailureRate, + Failures: testRateFailures + 1, + Window: testRateWindow, + }, { + Host: "bar", + Type: circuit.FailureRate, + Failures: testRateFailures + 1, + Window: testRateWindow, + }}, + filters: map[string][]*eskip.Filter{ + "foo": {{ + Name: circuitfilters.DisableBreakerName, + }}, + "bar": {{ + Name: circuitfilters.RateBreakerName, + Args: []interface{}{ + testRateFailures, + testRateWindow, + }, + }}, + }, + steps: []scenarioStep{ + times( + testRateWindow, + request(200), + requestHost("foo", 200), + requestHost("bar", 200), + ), + checkBackendCounter(testRateWindow), + checkBackendHostCounter("foo", testRateWindow), + checkBackendHostCounter("bar", testRateWindow), + setBackendFail, + setBackendHostFail("foo"), + setBackendHostFail("bar"), + times(testRateFailures, + request(500), + requestHost("foo", 500), + requestHost("bar", 500), + ), + checkBackendCounter(testRateFailures), + checkBackendHostCounter("foo", testRateFailures), + checkBackendHostCounter("bar", testRateFailures), + request(500), + requestHost("foo", 500), + requestHostOpen("bar"), + checkBackendCounter(1), + checkBackendHostCounter("foo", 1), + checkBackendHostCounter("bar", 0), + request(500), + requestHost("foo", 500), + checkBackendCounter(1), + checkBackendHostCounter("foo", 1), + requestOpen, + requestHost("foo", 500), + checkBackendCounter(0), + checkBackendHostCounter("foo", 1), + }, + }) +} diff --git a/proxy/doc.go b/proxy/doc.go index 13e65a9ef4..2afe921e23 100644 --- a/proxy/doc.go +++ b/proxy/doc.go @@ -103,7 +103,7 @@ from the external data sources, and are tested against the requests before the general routing tree. -Handling the 'Host' header +Handling the Host header The default behavior regarding the 'Host' header of the proxy requests is that the proxy ignores the value set in the incoming request. This @@ -123,6 +123,18 @@ and `SetOutgoingHost()` methods of the `FilterContext` need to be used instead of the `Request.Header` map. +Circuit Breakers + +When configured, skipper can use circuit breakers for the backend +requests. It asks the registry for a matching circuit breaker for +every request, and if there is any, checks if it is closed before +sending out the backend request. On connection errors and responses +with a status code higher than 499, it reports a failure to the +current breaker, otherwise it reports a success. + +For details, see: https://godoc.org/github.com/zalando/skipper/circuit. + + Proxy Example The below example demonstrates creating a routing proxy as a standard diff --git a/proxy/failingbackend_test.go b/proxy/failingbackend_test.go new file mode 100644 index 0000000000..39993b041f --- /dev/null +++ b/proxy/failingbackend_test.go @@ -0,0 +1,208 @@ +package proxy_test + +import ( + "errors" + "net" + "net/http" + "testing" +) + +type failingBackend struct { + c chan *failingBackend + up bool + healthy bool + address string + url string + server *http.Server + count int +} + +func freeAddress() string { + l, err := net.Listen("tcp", ":0") + if err != nil { + panic(err) + } + + defer l.Close() + return l.Addr().String() +} + +func newFailingBackend() *failingBackend { + address := freeAddress() + b := &failingBackend{ + c: make(chan *failingBackend, 1), + healthy: true, + address: address, + url: "http://" + address, + } + + b.startSynced() + b.c <- b + return b +} + +func (b *failingBackend) synced(f func()) { + b = <-b.c + f() + b.c <- b +} + +func (b *failingBackend) succeed() { + b.synced(func() { + if b.healthy { + return + } + + if !b.up { + b.startSynced() + } + + b.healthy = true + }) +} + +func (b *failingBackend) fail() { + b.synced(func() { + b.healthy = false + }) +} + +func (b *failingBackend) counter() int { + var count int + b.synced(func() { + count = b.count + }) + + return count +} + +func (b *failingBackend) resetCounter() { + b.synced(func() { + b.count = 0 + }) +} + +func (b *failingBackend) startSynced() { + if b.up { + return + } + + l, err := net.Listen("tcp", b.address) + if err != nil { + panic(err) + } + + b.server = &http.Server{Handler: b} + + b.up = true + go func(s *http.Server, l net.Listener) { + err := s.Serve(l) + if err != nil && err != http.ErrServerClosed { + panic(err) + } + }(b.server, l) +} + +func (b *failingBackend) start() { + b.synced(b.startSynced) +} + +func (b *failingBackend) closeSynced() { + if !b.up { + return + } + + b.server.Close() + b.up = false +} + +func (b *failingBackend) close() { + b.synced(b.closeSynced) +} + +func (b *failingBackend) down() { b.close() } + +func (b *failingBackend) reset() { + b.synced(func() { + b.closeSynced() + b.count = 0 + b.startSynced() + }) +} + +func (b *failingBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) { + b.synced(func() { + b.count++ + if !b.healthy { + w.WriteHeader(http.StatusInternalServerError) + } + }) +} + +func TestFailingBackend(t *testing.T) { + b := newFailingBackend() + defer b.close() + + req := func(fail, down bool) error { + rsp, err := http.Get(b.url) + if down { + if err == nil { + return errors.New("failed to fail") + } + + return nil + } else if err != nil { + return err + } + + defer rsp.Body.Close() + + if fail && rsp.StatusCode != http.StatusInternalServerError || + !fail && rsp.StatusCode != http.StatusOK { + t.Error("invalid status", rsp.StatusCode) + } + + return nil + } + + if err := req(false, false); err != nil { + t.Error(err) + return + } + + b.fail() + if err := req(true, false); err != nil { + t.Error(err) + return + } + + b.succeed() + if err := req(false, false); err != nil { + t.Error(err) + return + } + + b.fail() + if err := req(true, false); err != nil { + t.Error(err) + return + } + + b.down() + if err := req(false, true); err != nil { + t.Error(err) + return + } + + b.start() + if err := req(true, false); err != nil { + t.Error(err) + return + } + + b.succeed() + if err := req(false, false); err != nil { + t.Error(err) + return + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 4737c6136b..d784864c67 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -13,7 +13,9 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/circuit" "github.com/zalando/skipper/eskip" + circuitfilters "github.com/zalando/skipper/filters/circuit" "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" ) @@ -110,9 +112,20 @@ type Params struct { // -1. Note, that disabling looping by this option, may result // wrong routing depending on the current configuration. MaxLoopbacks int + + // CircuitBreakers provides a registry that skipper can use to + // find the matching circuit breaker for backend requests. If not + // set, no circuit breakers are used. + CircuitBreakers *circuit.Registry } -var errMaxLoopbacksReached = errors.New("max loopbacks reached") +var ( + errMaxLoopbacksReached = errors.New("max loopbacks reached") + errCircuitBreakerOpen = &proxyError{ + err: errors.New("circuit breaker open"), + code: http.StatusServiceUnavailable, + } +) // When set, the proxy will skip the TLS verification on outgoing requests. func (f Flags) Insecure() bool { return f&Insecure != 0 } @@ -155,6 +168,7 @@ type Proxy struct { flushInterval time.Duration experimentalUpgrade bool maxLoops int + breakers *circuit.Registry } // proxyError is used to wrap errors during proxying and to indicate @@ -313,6 +327,7 @@ func WithParams(p Params) *Proxy { flushInterval: p.FlushInterval, experimentalUpgrade: p.ExperimentalUpgrade, maxLoops: p.MaxLoopbacks, + breakers: p.CircuitBreakers, } } @@ -470,6 +485,23 @@ func (p *Proxy) makeBackendRequest(ctx *context) (*http.Response, error) { return response, nil } +func (p *Proxy) checkBreaker(c *context) (func(bool), bool) { + if p.breakers == nil { + return nil, true + } + + settings, _ := c.stateBag[circuitfilters.RouteSettingsKey].(circuit.BreakerSettings) + settings.Host = c.outgoingHost + + b := p.breakers.Get(settings) + if b == nil { + return nil, true + } + + done, ok := b.Allow() + return done, ok +} + func (p *Proxy) do(ctx *context) error { if ctx.loopCounter > p.maxLoops { return errMaxLoopbacksReached @@ -515,13 +547,26 @@ func (p *Proxy) do(ctx *context) error { ctx.outgoingDebugRequest = debugReq ctx.setResponse(&http.Response{Header: make(http.Header)}, p.flags.PreserveOriginal()) } else { + done, allow := p.checkBreaker(ctx) + if !allow { + return errCircuitBreakerOpen + } + backendStart := time.Now() rsp, err := p.makeBackendRequest(ctx) if err != nil { + if done != nil { + done(false) + } + p.metrics.IncErrorsBackend(ctx.route.Id) return err } + if done != nil { + done(rsp.StatusCode < http.StatusInternalServerError) + } + ctx.setResponse(rsp, p.flags.PreserveOriginal()) p.metrics.MeasureBackend(ctx.route.Id, backendStart) p.metrics.MeasureBackendHost(ctx.route.Host, backendStart) @@ -557,6 +602,49 @@ func (p *Proxy) serveResponse(ctx *context) { } } +func (p *Proxy) errorResponse(ctx *context, err error) { + perr, ok := err.(*proxyError) + if ok && perr.handled { + return + } + + id := unknownRouteID + if ctx.route != nil { + id = ctx.route.Id + } + + code := http.StatusInternalServerError + if ok && perr.code != 0 { + code = perr.code + } + + if p.flags.Debug() { + di := &debugInfo{ + incoming: ctx.originalRequest, + outgoing: ctx.outgoingDebugRequest, + response: ctx.response, + err: err, + filterPanics: ctx.debugFilterPanics, + } + + if ctx.route != nil { + di.route = &ctx.route.Route + } + + dbgResponse(ctx.responseWriter, di) + return + } + + switch { + case err == errCircuitBreakerOpen: + ctx.responseWriter.Header().Set("X-Circuit-Open", "true") + default: + log.Errorf("error while proxying, route %s, status code %d: %v", id, code, err) + } + + p.sendError(ctx, id, code) +} + // http.Handler implementation func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := newContext(w, r, p.flags.PreserveOriginal()) @@ -573,38 +661,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := p.do(ctx) if err != nil { - if perr, ok := err.(*proxyError); !ok || !perr.handled { - id := unknownRouteID - if ctx.route != nil { - id = ctx.route.Id - } - - code := http.StatusInternalServerError - if ok && perr.code != 0 { - code = perr.code - } - - if p.flags.Debug() { - di := &debugInfo{ - incoming: ctx.originalRequest, - outgoing: ctx.outgoingDebugRequest, - response: ctx.response, - err: err, - filterPanics: ctx.debugFilterPanics, - } - - if ctx.route != nil { - di.route = &ctx.route.Route - } - - dbgResponse(w, di) - return - } - - p.sendError(ctx, id, code) - log.Errorf("error while proxying, route %s, status code %d: %v", id, code, err) - } - + p.errorResponse(ctx, err) return } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 7c5bae3c2d..ddf0f14c4b 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -480,14 +480,14 @@ func TestAppliesFilters(t *testing.T) { } } -type breaker struct { +type shunter struct { resp *http.Response } -func (b *breaker) Request(c filters.FilterContext) { c.Serve(b.resp) } -func (*breaker) Response(filters.FilterContext) {} -func (b *breaker) CreateFilter(fc []interface{}) (filters.Filter, error) { return b, nil } -func (*breaker) Name() string { return "breaker" } +func (b *shunter) Request(c filters.FilterContext) { c.Serve(b.resp) } +func (*shunter) Response(filters.FilterContext) {} +func (b *shunter) CreateFilter(fc []interface{}) (filters.Filter, error) { return b, nil } +func (*shunter) Name() string { return "shunter" } func TestBreakFilterChain(t *testing.T) { s := startTestServer([]byte("Hello World!"), 0, func(r *http.Request) { @@ -503,14 +503,14 @@ func TestBreakFilterChain(t *testing.T) { StatusCode: http.StatusUnauthorized, Status: "Impossible body", } - fr.Register(&breaker{resp1}) + fr.Register(&shunter{resp1}) fr.Register(builtin.NewResponseHeader()) doc := fmt.Sprintf(`breakerDemo: - Path("/breaker") -> + Path("/shunter") -> requestHeader("X-Expected", "request header") -> responseHeader("X-Expected", "response header") -> - breaker() -> + shunter() -> requestHeader("X-Unexpected", "foo") -> responseHeader("X-Unexpected", "bar") -> "%s"`, s.URL) @@ -522,7 +522,7 @@ func TestBreakFilterChain(t *testing.T) { defer tp.close() - r, _ := http.NewRequest("GET", "https://www.example.org/breaker", nil) + r, _ := http.NewRequest("GET", "https://www.example.org/shunter", nil) w := httptest.NewRecorder() tp.proxy.ServeHTTP(w, r) @@ -535,11 +535,11 @@ func TestBreakFilterChain(t *testing.T) { } if _, has := r.Header["X-Unexpected"]; has { - t.Error("Request has an unexpected header from a filter after the breaker in the chain") + t.Error("Request has an unexpected header from a filter after the shunter in the chain") } if _, has := w.Header()["X-Unexpected"]; has { - t.Error("Response has an unexpected header from a filter after the breaker in the chain") + t.Error("Response has an unexpected header from a filter after the shunter in the chain") } if w.Code != http.StatusUnauthorized && w.Body.String() != "Impossible body" { diff --git a/readme.md b/readme.md index e469798fa3..9e1952abfb 100644 --- a/readme.md +++ b/readme.md @@ -90,6 +90,7 @@ Skipper's [godoc](https://godoc.org/github.com/zalando/skipper) page includes de - Service Backends - Route Definitions - Data Sources +- Circuit Breakers - Extending It with Customized Predicates, Filters, and Builds - Proxy Packages - Logging and Metrics diff --git a/skipper.go b/skipper.go index 86aa868af2..f11a68a81d 100644 --- a/skipper.go +++ b/skipper.go @@ -8,6 +8,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/circuit" "github.com/zalando/skipper/dataclients/kubernetes" "github.com/zalando/skipper/eskipfile" "github.com/zalando/skipper/etcd" @@ -226,7 +227,12 @@ type Options struct { // Experimental feature to handle protocol Upgrades for Websockets, SPDY, etc. ExperimentalUpgrade bool + // MaxLoopbacks defines the maximum number of loops that the proxy can execute when the routing table + // contains loop backends (). MaxLoopbacks int + + // BreakerSettings contain global and host specific settings for the circuit breakers. + BreakerSettings []circuit.BreakerSettings } func createDataClients(o Options, auth innkeeper.Authentication) ([]routing.DataClient, error) { @@ -448,6 +454,10 @@ func Run(o Options) error { MaxLoopbacks: o.MaxLoopbacks, } + if len(o.BreakerSettings) > 0 { + proxyParams.CircuitBreakers = circuit.NewRegistry(o.BreakerSettings...) + } + if o.DebugListener != "" { do := proxyParams do.Flags |= proxy.Debug