From 3f2f407656155829baa743904f335136cf8e17d5 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sat, 15 Apr 2017 20:43:00 +0000 Subject: [PATCH 01/15] update documentation; introduce SchedulerError --- api/v1/lib/extras/scheduler/controller/controller.go | 10 +++++++--- api/v1/lib/httpcli/http.go | 3 ++- api/v1/lib/httpcli/httpsched/state.go | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/api/v1/lib/extras/scheduler/controller/controller.go b/api/v1/lib/extras/scheduler/controller/controller.go index fc7ca43c..1897da17 100644 --- a/api/v1/lib/extras/scheduler/controller/controller.go +++ b/api/v1/lib/extras/scheduler/controller/controller.go @@ -1,8 +1,6 @@ package controller import ( - "fmt" - "github.com/mesos/mesos-go/api/v1/lib" "github.com/mesos/mesos-go/api/v1/lib/encoding" "github.com/mesos/mesos-go/api/v1/lib/scheduler" @@ -131,13 +129,19 @@ func (ca *ContextAdapter) Error(err error) { } } +type SchedulerError string + +func (e SchedulerError) Error() string { + return string(e) +} + // DefaultHandler provides the minimum implementation required for correct controller behavior. func DefaultHandler(e *scheduler.Event) (err error) { if e.GetType() == scheduler.Event_ERROR { // it's recommended that we abort and re-try subscribing; returning an // error here will cause the event loop to terminate and the connection // will be reset. - err = fmt.Errorf("ERROR: %q", e.GetError().GetMessage()) + err = SchedulerError(e.GetError().GetMessage()) } return } diff --git a/api/v1/lib/httpcli/http.go b/api/v1/lib/httpcli/http.go index 04a2dd25..7a6f60c9 100644 --- a/api/v1/lib/httpcli/http.go +++ b/api/v1/lib/httpcli/http.go @@ -384,7 +384,8 @@ func With(opt ...ConfigOpt) DoFunc { return config.client.Do } -// Timeout returns an ConfigOpt that sets a Config's timeout and keep-alive timeout. +// Timeout returns an ConfigOpt that sets a Config's response header timeout, tls handshake timeout, +// and dialer timeout. func Timeout(d time.Duration) ConfigOpt { return func(c *Config) { c.transport.ResponseHeaderTimeout = d diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 9c330762..76cefaf1 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -24,6 +24,7 @@ var ( ) type ( + // state implements calls.Caller and tracks connectivity with Mesos state struct { client *client // client is a handle to the original underlying HTTP client From 20c7a03b00881b3ea3e991845a2b18fe5bb46309 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 01:52:02 +0000 Subject: [PATCH 02/15] improved failover for non-checkpointing scheduler --- api/v1/cmd/example-executor/main.go | 9 +++- api/v1/cmd/example-scheduler/app/app.go | 53 +++++++++++++--------- api/v1/cmd/example-scheduler/app/config.go | 3 ++ api/v1/cmd/example-scheduler/app/state.go | 4 ++ api/v1/cmd/example-scheduler/app/store.go | 43 ++++++++++++++++++ api/v1/lib/client.go | 24 ++++++++++ api/v1/lib/httpcli/http.go | 5 +- api/v1/lib/httpcli/httpsched/httpsched.go | 2 +- api/v1/lib/httpcli/httpsched/state.go | 31 +++++++++++-- api/v1/lib/scheduler/calls/caller.go | 25 ++++++++-- 10 files changed, 167 insertions(+), 32 deletions(-) create mode 100644 api/v1/cmd/example-scheduler/app/store.go diff --git a/api/v1/cmd/example-executor/main.go b/api/v1/cmd/example-executor/main.go index dba69b47..7690d9e4 100644 --- a/api/v1/cmd/example-executor/main.go +++ b/api/v1/cmd/example-executor/main.go @@ -36,6 +36,13 @@ func main() { os.Exit(0) } +func maybeReconnect(cfg config.Config) <-chan struct{} { + if cfg.Checkpoint { + return backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil) + } + return nil +} + func run(cfg config.Config) { var ( apiURL = url.URL{ @@ -58,7 +65,7 @@ func run(cfg config.Config) { failedTasks: make(map[mesos.TaskID]mesos.TaskStatus), } subscribe = calls.Subscribe(nil, nil).With(state.callOptions...) - shouldReconnect = backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil) + shouldReconnect = maybeReconnect(cfg) disconnected = time.Now() handler = buildEventHandler(state) ) diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index a701f200..27a856ad 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -42,21 +42,28 @@ func Run(cfg Config) error { } func buildControllerConfig(state *internalState, shutdown <-chan struct{}) controller.Config { - controlContext := &controller.ContextAdapter{ - DoneFunc: func() bool { return state.done }, - FrameworkIDFunc: func() string { return state.frameworkID }, - ErrorFunc: func(err error) { - if err != nil && err != io.EOF { - log.Println(err) - } else { - log.Println("disconnected") - } - }, - } + var ( + frameworkIDStore = NewInMemoryIDStore() + controlContext = &controller.ContextAdapter{ + DoneFunc: func() bool { return state.done }, + // not called concurrently w/ subscription events, don't worry about using the atomic + FrameworkIDFunc: func() string { return frameworkIDStore.Get() }, + ErrorFunc: func(err error) { + if err != nil && err != io.EOF { + log.Println(err) + } else { + log.Println("disconnected") + } + }, + } + ) state.cli = calls.Decorators{ callMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics), logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}), + // automatically set the frameworkID for all outgoing calls + calls.When(func() bool { return frameworkIDStore.Get() != "" }, + calls.FrameworkCaller(func() string { return frameworkIDStore.Get() })), }.Apply(state.cli) return controller.Config{ @@ -68,12 +75,12 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr Handler: events.Decorators{ eventMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics), events.Decorator(logAllEvents).If(state.config.verbose), - }.Apply(buildEventHandler(state)), + }.Apply(buildEventHandler(state, frameworkIDStore)), } } // buildEventHandler generates and returns a handler to process events received from the subscription. -func buildEventHandler(state *internalState) events.Handler { +func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Handler { // TODO(jdef) would be nice to merge this ack handler with the status update handler below; need to // figure out appropriate error propagation among chained handlers. ack := events.AcknowledgeUpdates(func() calls.Caller { return state.cli }) @@ -104,19 +111,23 @@ func buildEventHandler(state *internalState) events.Handler { statusUpdate(state, e.GetUpdate().GetStatus()) return nil }, - scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) (err error) { + scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) error { log.Println("received a SUBSCRIBED event") - if state.frameworkID == "" { - state.frameworkID = e.GetSubscribed().GetFrameworkID().GetValue() + frameworkID := e.GetSubscribed().GetFrameworkID().GetValue() + if state.frameworkID == "" || state.frameworkID != frameworkID { + if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint { + state.done = true // TODO(jdef) not goroutine safe + return errors.New("frameworkID changed unexpectedly; failover may be broken") + } + state.frameworkID = frameworkID if state.frameworkID == "" { // sanity check - err = errors.New("mesos gave us an empty frameworkID") - } else { - // automatically set the frameworkID for all outgoing calls - state.cli = calls.FrameworkCaller(state.frameworkID).Apply(state.cli) + state.done = true // TODO(jdef) not goroutine safe + return errors.New("mesos gave us an empty frameworkID") } + frameworkIDStore.Set(frameworkID) } - return + return nil }, }), ) diff --git a/api/v1/cmd/example-scheduler/app/config.go b/api/v1/cmd/example-scheduler/app/config.go index 9a33e092..edbcb879 100644 --- a/api/v1/cmd/example-scheduler/app/config.go +++ b/api/v1/cmd/example-scheduler/app/config.go @@ -15,6 +15,7 @@ type Config struct { url string codec codec timeout time.Duration + failoverTimeout time.Duration checkpoint bool principal string hostname string @@ -47,6 +48,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { fs.Var(&cfg.codec, "codec", "Codec to encode/decode scheduler API communications [protobuf, json]") fs.StringVar(&cfg.url, "url", cfg.url, "Mesos scheduler API URL") fs.DurationVar(&cfg.timeout, "timeout", cfg.timeout, "Mesos scheduler API connection timeout") + fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout") fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable framework checkpointing") fs.StringVar(&cfg.principal, "principal", cfg.principal, "Framework principal with which to authenticate") fs.StringVar(&cfg.hostname, "hostname", cfg.hostname, "Framework hostname that is advertised to the master") @@ -84,6 +86,7 @@ func NewConfig() Config { url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"), codec: codec{Codec: &encoding.ProtobufCodec}, timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"), + failoverTimeout: envDuration("FRAMEWORK_FAILOVER_TIMEOUT", "1000h"), checkpoint: true, server: server{address: env("LIBPROCESS_IP", "127.0.0.1")}, tasks: envInt("NUM_TASKS", "5"), diff --git a/api/v1/cmd/example-scheduler/app/state.go b/api/v1/cmd/example-scheduler/app/state.go index a81b3bcc..b66ebbbb 100644 --- a/api/v1/cmd/example-scheduler/app/state.go +++ b/api/v1/cmd/example-scheduler/app/state.go @@ -135,11 +135,15 @@ func buildHTTPSched(cfg Config, creds credentials) calls.Caller { } func buildFrameworkInfo(cfg Config) *mesos.FrameworkInfo { + failoverTimeout := cfg.failoverTimeout.Seconds() frameworkInfo := &mesos.FrameworkInfo{ User: cfg.user, Name: cfg.name, Checkpoint: &cfg.checkpoint, } + if cfg.failoverTimeout > 0 { + frameworkInfo.FailoverTimeout = &failoverTimeout + } if cfg.role != "" { frameworkInfo.Role = &cfg.role } diff --git a/api/v1/cmd/example-scheduler/app/store.go b/api/v1/cmd/example-scheduler/app/store.go new file mode 100644 index 00000000..bc7fa7a2 --- /dev/null +++ b/api/v1/cmd/example-scheduler/app/store.go @@ -0,0 +1,43 @@ +package app + +import "sync/atomic" + +// IDStore is a thread-safe abstraction to load and store a stringified ID. +type IDStore interface { + Get() string + Set(string) +} + +type IDStoreAdapter struct { + GetFunc func() string + SetFunc func(string) +} + +func (a IDStoreAdapter) Get() string { + if a.GetFunc != nil { + return a.GetFunc() + } + return "" +} + +func (a IDStoreAdapter) Set(s string) { + if a.SetFunc != nil { + a.SetFunc(s) + } +} + +func NewInMemoryIDStore() IDStore { + var frameworkID atomic.Value + return &IDStoreAdapter{ + GetFunc: func() string { + x := frameworkID.Load() + if x == nil { + return "" + } + return x.(string) + }, + SetFunc: func(s string) { + frameworkID.Store(s) + }, + } +} diff --git a/api/v1/lib/client.go b/api/v1/lib/client.go index b107b706..8f9da462 100644 --- a/api/v1/lib/client.go +++ b/api/v1/lib/client.go @@ -25,3 +25,27 @@ type Response interface { io.Closer Decoder() encoding.Decoder } + +// ResponseWrapper delegates to optional handler funcs for invocations of Response methods. +type ResponseWrapper struct { + Response Response + CloseFunc func() error + DecoderFunc func() encoding.Decoder +} + +func (wrapper *ResponseWrapper) Close() error { + if wrapper.CloseFunc != nil { + return wrapper.CloseFunc() + } + if wrapper.Response != nil { + return wrapper.Response.Close() + } + return nil +} + +func (wrapper *ResponseWrapper) Decoder() encoding.Decoder { + if wrapper.DecoderFunc != nil { + return wrapper.DecoderFunc() + } + return wrapper.Response.Decoder() +} diff --git a/api/v1/lib/httpcli/http.go b/api/v1/lib/httpcli/http.go index 7a6f60c9..ae5ebd07 100644 --- a/api/v1/lib/httpcli/http.go +++ b/api/v1/lib/httpcli/http.go @@ -128,7 +128,7 @@ type Client struct { func New(opts ...Opt) *Client { c := &Client{ codec: &encoding.ProtobufCodec, - do: With(), + do: With(DefaultConfigOpt...), header: http.Header{}, errorMapper: defaultErrorMapper, } @@ -352,6 +352,9 @@ type Config struct { type ConfigOpt func(*Config) +// DefaultConfigOpt represents the default client config options. +var DefaultConfigOpt []ConfigOpt + // With returns a DoFunc that executes HTTP round-trips. // The default implementation provides reasonable defaults for timeouts: // keep-alive, connection, request/response read/write, and TLS handshake. diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index 1965bcc3..24e01d03 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -38,7 +38,7 @@ type ( redirect RedirectSettings } - // Caller is the public interface this framework scheduler's should consume + // Caller is the public interface a framework scheduler's should consume Caller interface { calls.Caller // httpDo is intentionally package-private; clients of this package may extend a Caller diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 76cefaf1..4b7a4fc9 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/encoding" "github.com/mesos/mesos-go/api/v1/lib/httpcli" "github.com/mesos/mesos-go/api/v1/lib/scheduler" "github.com/mesos/mesos-go/api/v1/lib/scheduler/calls" @@ -95,14 +96,34 @@ func disconnectedFn(state *state) stateFn { ) // (c) execute the call, save the result in resp, err - state.resp, state.err = subscribeCaller.Call(state.call) + stateResp, stateErr := subscribeCaller.Call(state.call) + state.err = stateErr + + // wrap the response: any errors processing the subscription stream should result in a + // transition to a disconnected state ASAP. + state.resp = &mesos.ResponseWrapper{ + Response: stateResp, + DecoderFunc: func() encoding.Decoder { + decoder := stateResp.Decoder() + return func(u encoding.Unmarshaler) (err error) { + err = decoder(u) + if err != nil { + state.m.Lock() + state.fn = disconnectedFn + state.m.Unlock() + _ = stateResp.Close() // swallow any error here + } + return + } + }, + } - // (d) if err != nil return unsubscribedFn - if state.err != nil { + // (d) if err != nil return disconnectedFn since we're unsubscribed + if stateErr != nil { return disconnectedFn } - // (e) else prepare callerTemporary w/ special header, return subscribingFn + // (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed state.caller = &callerTemporary{ opt: httpcli.DefaultHeader(headerMesosStreamID, mesosStreamID), callerInternal: state.client, @@ -121,7 +142,7 @@ func connectedFn(state *state) stateFn { // (b) execute call, save the result in resp, err state.resp, state.err = state.caller.Call(state.call) - // (c) return connectedFn; TODO(jdef) detect specific Mesos error codes as triggers -> disconnectedFn? + // stay connected, don't attempt to interpret errors here return connectedFn } diff --git a/api/v1/lib/scheduler/calls/caller.go b/api/v1/lib/scheduler/calls/caller.go index a9aa1dc9..75502b94 100644 --- a/api/v1/lib/scheduler/calls/caller.go +++ b/api/v1/lib/scheduler/calls/caller.go @@ -53,6 +53,22 @@ func (d Decorator) If(b bool) Decorator { return result } +// When applies the given decorator when the supplied bool func returns true. The supplied +// condtional func `f` is evaluated upon every Caller invocation. +func When(f func() bool, maybeDecorate Decorator) Decorator { + if f == nil { + return noopDecorator + } + return func(h Caller) Caller { + return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { + if f() { + return maybeDecorate(h).Call(c) + } + return h.Call(c) + }) + } +} + // Apply applies the Decorators in the order they're listed such that the last Decorator invoked // generates the final (wrapping) Caller that is ultimately returned. func (ds Decorators) Combine() (result Decorator) { @@ -75,11 +91,14 @@ func (ds Decorators) Combine() (result Decorator) { return } -// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls. -func FrameworkCaller(frameworkID string) Decorator { +// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls (except SUBSCRIBE). +func FrameworkCaller(frameworkID func() string) Decorator { return func(h Caller) Caller { return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { - c.FrameworkID = &mesos.FrameworkID{Value: frameworkID} + // never overwrite framework ID for subscribe calls; the scheduler must do that part + if c.Type == nil || *c.Type != scheduler.Call_SUBSCRIBE { + c.FrameworkID = &mesos.FrameworkID{Value: frameworkID()} + } return h.Call(c) }) } From 0806021edba1f4a2590b16eacf706d17db68f73f Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 02:51:20 +0000 Subject: [PATCH 03/15] additional debugging; restore original internal caller state upon detecting disconnect --- api/v1/cmd/example-scheduler/app/app.go | 1 + api/v1/lib/httpcli/httpsched/state.go | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index 27a856ad..f3e74986 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -125,6 +125,7 @@ func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Ha state.done = true // TODO(jdef) not goroutine safe return errors.New("mesos gave us an empty frameworkID") } + log.Println("FrameworkID", frameworkID) frameworkIDStore.Set(frameworkID) } return nil diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 4b7a4fc9..64d12290 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -45,7 +45,7 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc { if debug { return func(req *http.Request) (*http.Response, error) { if debug { - log.Println("wrapping request") + log.Println("wrapping request", *req) } resp, err := f(req) if debug && err == nil { @@ -99,8 +99,15 @@ func disconnectedFn(state *state) stateFn { stateResp, stateErr := subscribeCaller.Call(state.call) state.err = stateErr + // (d) if err != nil return disconnectedFn since we're unsubscribed + if stateErr != nil { + state.resp = nil + return disconnectedFn + } + // wrap the response: any errors processing the subscription stream should result in a // transition to a disconnected state ASAP. + stateCaller := state.caller state.resp = &mesos.ResponseWrapper{ Response: stateResp, DecoderFunc: func() encoding.Decoder { @@ -109,8 +116,9 @@ func disconnectedFn(state *state) stateFn { err = decoder(u) if err != nil { state.m.Lock() + defer state.m.Unlock() + state.caller = stateCaller // restore the original caller state.fn = disconnectedFn - state.m.Unlock() _ = stateResp.Close() // swallow any error here } return @@ -118,11 +126,6 @@ func disconnectedFn(state *state) stateFn { }, } - // (d) if err != nil return disconnectedFn since we're unsubscribed - if stateErr != nil { - return disconnectedFn - } - // (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed state.caller = &callerTemporary{ opt: httpcli.DefaultHeader(headerMesosStreamID, mesosStreamID), @@ -151,5 +154,10 @@ func (state *state) Call(call *scheduler.Call) (resp mesos.Response, err error) defer state.m.Unlock() state.call = call state.fn = state.fn(state) + + if debug && state.err != nil { + log.Print(*call, state.err) + } + return state.resp, state.err } From 001040f6c6d72821378daf93ef97dc3a6254ab8d Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 09:45:19 +0000 Subject: [PATCH 04/15] v1 scheduler API re-registration requires frameworkID **twice** in SUBSCRIBE --- .../extras/scheduler/controller/controller.go | 4 +++- api/v1/lib/httpcli/httpsched/state.go | 22 +++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/api/v1/lib/extras/scheduler/controller/controller.go b/api/v1/lib/extras/scheduler/controller/controller.go index 1897da17..76d15ab3 100644 --- a/api/v1/lib/extras/scheduler/controller/controller.go +++ b/api/v1/lib/extras/scheduler/controller/controller.go @@ -76,7 +76,9 @@ func (_ *controllerImpl) Run(config Config) (lastErr error) { for !config.Context.Done() { frameworkID := config.Context.FrameworkID() if config.Framework.GetFailoverTimeout() > 0 && frameworkID != "" { - subscribe.Subscribe.FrameworkInfo.ID = &mesos.FrameworkID{Value: frameworkID} + frameworkProto := &mesos.FrameworkID{Value: frameworkID} + subscribe.Subscribe.FrameworkInfo.ID = frameworkProto + subscribe.FrameworkID = frameworkProto } <-config.RegistrationTokens resp, err := config.Caller.Call(subscribe) diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 64d12290..b2682f5c 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -2,6 +2,7 @@ package httpsched import ( "errors" + "io/ioutil" "log" "net/http" "sync" @@ -44,11 +45,9 @@ type ( func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc { if debug { return func(req *http.Request) (*http.Response, error) { - if debug { - log.Println("wrapping request", *req) - } + log.Println("wrapping request", req.URL, req.Header) resp, err := f(req) - if debug && err == nil { + if err == nil { log.Printf("status %d", resp.StatusCode) for k := range resp.Header { log.Println("header " + k + ": " + resp.Header.Get(k)) @@ -85,6 +84,18 @@ func disconnectedFn(state *state) stateFn { err = errMissingMesosStreamId } } + if err == nil && debug && resp.StatusCode >= 400 { + // capture any error message in the response entity + // TODO(jdef) we should be doing this work in httpcli and sending + // back an APIError to capture these details. + defer resp.Body.Close() + buf, _ := ioutil.ReadAll(resp.Body) + msg := string(buf) + if len(msg) > 80 { + msg = msg[:80] + } + log.Printf("ERROR(http,%d): %q", resp.StatusCode, msg) + } return } }) @@ -101,6 +112,9 @@ func disconnectedFn(state *state) stateFn { // (d) if err != nil return disconnectedFn since we're unsubscribed if stateErr != nil { + if stateResp != nil { + stateResp.Close() + } state.resp = nil return disconnectedFn } From b9c37ac2c6edf1a71fd5db8efe1c302021f065be Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 10:00:37 +0000 Subject: [PATCH 05/15] undo redundant tracking of old state caller, we do not need it --- api/v1/lib/httpcli/httpsched/state.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index b2682f5c..ae44112e 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -121,7 +121,6 @@ func disconnectedFn(state *state) stateFn { // wrap the response: any errors processing the subscription stream should result in a // transition to a disconnected state ASAP. - stateCaller := state.caller state.resp = &mesos.ResponseWrapper{ Response: stateResp, DecoderFunc: func() encoding.Decoder { @@ -131,7 +130,6 @@ func disconnectedFn(state *state) stateFn { if err != nil { state.m.Lock() defer state.m.Unlock() - state.caller = stateCaller // restore the original caller state.fn = disconnectedFn _ = stateResp.Close() // swallow any error here } From 479bbf23271568d12151ce886ea2ecf3ddf45b8c Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 10:35:54 +0000 Subject: [PATCH 06/15] httpcli: introduce APIError, deprecate ProtocolError and Err* variables --- api/v1/lib/httpcli/http.go | 57 ++++++++++++++++++----- api/v1/lib/httpcli/httpsched/httpsched.go | 2 +- api/v1/lib/httpcli/httpsched/state.go | 13 ------ 3 files changed, 46 insertions(+), 26 deletions(-) diff --git a/api/v1/lib/httpcli/http.go b/api/v1/lib/httpcli/http.go index ae5ebd07..51125726 100644 --- a/api/v1/lib/httpcli/http.go +++ b/api/v1/lib/httpcli/http.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "net" "net/http" @@ -16,26 +17,28 @@ import ( "github.com/mesos/mesos-go/api/v1/lib/recordio" ) +// Deprecation notice: these error variables are no longer returned by this module. Use APIError instead. +// TODO(jdef) remove these error vars after v0.0.3 var ( - // ErrNotLeader is returned by Do calls that are sent to a non leading Mesos master. + // ErrNotLeader is returned by Do calls that are sent to a non leading Mesos master. Deprecated. ErrNotLeader = errors.New("mesos: call sent to a non-leading master") - // ErrAuth is returned by Do calls that are not successfully authenticated. + // ErrAuth is returned by Do calls that are not successfully authenticated. Deprecated. ErrAuth = errors.New("mesos: call not authenticated") - // ErrUnsubscribed is returned by Do calls that are sent before a subscription is established. + // ErrUnsubscribed is returned by Do calls that are sent before a subscription is established. Deprecated. ErrUnsubscribed = errors.New("mesos: no subscription established") - // ErrVersion is returned by Do calls that are sent to an incompatible API version. + // ErrVersion is returned by Do calls that are sent to an incompatible API version. Deprecated. ErrVersion = errors.New("mesos: incompatible API version") - // ErrMalformed is returned by Do calls that are malformed. + // ErrMalformed is returned by Do calls that are malformed. Deprecated. ErrMalformed = errors.New("mesos: malformed request") - // ErrMediaType is returned by Do calls that are sent with an unsupported media type. + // ErrMediaType is returned by Do calls that are sent with an unsupported media type. Deprecated. ErrMediaType = errors.New("mesos: unsupported media type") // ErrRateLimit is returned by Do calls that are rate limited. This is a temporary condition - // that should clear. + // that should clear. Deprecated. ErrRateLimit = errors.New("mesos: rate limited") // ErrUnavailable is returned by Do calls that are sent to a master or agent that's in recovery, or - // does not yet realize that it's the leader. This is a temporary condition that should clear. + // does not yet realize that it's the leader. This is a temporary condition that should clear. Deprecated. ErrUnavailable = errors.New("mesos: mesos server unavailable") - // ErrNotFound could happen if the master or agent libprocess has not yet set up http routes + // ErrNotFound could happen if the master or agent libprocess has not yet set up http routes. Deprecated. ErrNotFound = errors.New("mesos: mesos http endpoint not found") // codeErrors maps HTTP response codes to their respective errors. @@ -54,21 +57,45 @@ var ( } defaultErrorMapper = ErrorMapperFunc(func(code int) error { + // for now, just scrape the string of the deprecated error var and use that + // as the APIError.Message. Eventually we'll get rid of the Err* variables. + // TODO(jdef) simplify this after v0.0.3 err, ok := codeErrors[code] if !ok { - err = ProtocolError(code) + err = &APIError{Code: code} + } + return &APIError{ + Code: code, + Message: err.Error(), } - return err }) ) // ProtocolError is a generic error type returned for expected status codes // received from Mesos. +// Deprecated: no longer used in favor of APIError. +// TODO(jdef) remove this after v0.0.3 type ProtocolError int // Error implements error interface func (pe ProtocolError) Error() string { return fmt.Sprintf("Unexpected Mesos HTTP error: %d", int(pe)) } +// APIError captures HTTP error codes and messages generated by Mesos. +type APIError struct { + Code int // Code is the HTTP response status code generated by Mesos + Message string // Message briefly summarizes the nature of the error + Details string // Details captures the HTTP response entity, if any, supplied by Mesos +} + +func (err *APIError) Error() string { + return err.Message +} + +func IsErrNotLeader(err error) bool { + apiErr, ok := err.(*APIError) + return ok && apiErr.Code == http.StatusTemporaryRedirect +} + const ( debug = false // TODO(jdef) kill me at some point @@ -104,7 +131,8 @@ type Response struct { // implements mesos.Response func (r *Response) Decoder() encoding.Decoder { return r.decoder } -// ErrorMapperFunc generates an error for the given statusCode +// ErrorMapperFunc generates an error for the given statusCode. +// WARNING: this API will change in an upcoming release. type ErrorMapperFunc func(statusCode int) error // ResponseHandler is invoked to process an HTTP response @@ -233,6 +261,11 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response, // noop; no decoder for these types of calls default: err = c.errorMapper(res.StatusCode) + if apiErr, ok := err.(*APIError); ok && res.Body != nil { + // Annotate the APIError with Details from the response + buf, _ := ioutil.ReadAll(res.Body) + apiErr.Details = string(buf) + } } return &Response{ decoder: events, diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index 24e01d03..6affef9b 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -164,7 +164,7 @@ func (mre *mesosRedirectionError) Error() string { func (cli *client) redirectHandler() httpcli.Opt { return httpcli.HandleResponse(func(hres *http.Response, err error) (mesos.Response, error) { resp, err := cli.HandleResponse(hres, err) // default response handler - if err == nil || (err != nil && err != httpcli.ErrNotLeader) { + if err == nil || (err != nil && !httpcli.IsErrNotLeader(err)) { return resp, err } res, ok := resp.(*httpcli.Response) diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index ae44112e..bfb882e2 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -2,7 +2,6 @@ package httpsched import ( "errors" - "io/ioutil" "log" "net/http" "sync" @@ -84,18 +83,6 @@ func disconnectedFn(state *state) stateFn { err = errMissingMesosStreamId } } - if err == nil && debug && resp.StatusCode >= 400 { - // capture any error message in the response entity - // TODO(jdef) we should be doing this work in httpcli and sending - // back an APIError to capture these details. - defer resp.Body.Close() - buf, _ := ioutil.ReadAll(resp.Body) - msg := string(buf) - if len(msg) > 80 { - msg = msg[:80] - } - log.Printf("ERROR(http,%d): %q", resp.StatusCode, msg) - } return } }) From 88d61117821d6f362ef65137291a1cd704fb09d2 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 11:49:39 +0000 Subject: [PATCH 07/15] add gpuClusterCompat flag to permit running the framework on specialized clusters where all agents have GPU resources --- api/v1/cmd/example-scheduler/app/config.go | 2 ++ api/v1/cmd/example-scheduler/app/state.go | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/api/v1/cmd/example-scheduler/app/config.go b/api/v1/cmd/example-scheduler/app/config.go index edbcb879..b3269378 100644 --- a/api/v1/cmd/example-scheduler/app/config.go +++ b/api/v1/cmd/example-scheduler/app/config.go @@ -39,6 +39,7 @@ type Config struct { compression bool credentials credentials authMode string + gpuClusterCompat bool } func (cfg *Config) AddFlags(fs *flag.FlagSet) { @@ -75,6 +76,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { fs.StringVar(&cfg.credentials.username, "credentials.username", cfg.credentials.username, "Username for Mesos authentication") fs.StringVar(&cfg.credentials.password, "credentials.passwordFile", cfg.credentials.password, "Path to file that contains the password for Mesos authentication") fs.StringVar(&cfg.authMode, "authmode", cfg.authMode, "Method to use for Mesos authentication; specify '"+AuthModeBasic+"' for simple HTTP authentication") + fs.BoolVar(&cfg.gpuClusterCompat, "gpuClusterCompat", cfg.gpuClusterCompat, "When true the framework will receive offers from agents w/ GPU resources.") } const AuthModeBasic = "basic" diff --git a/api/v1/cmd/example-scheduler/app/state.go b/api/v1/cmd/example-scheduler/app/state.go index b66ebbbb..7610b994 100644 --- a/api/v1/cmd/example-scheduler/app/state.go +++ b/api/v1/cmd/example-scheduler/app/state.go @@ -157,6 +157,11 @@ func buildFrameworkInfo(cfg Config) *mesos.FrameworkInfo { log.Println("using labels:", cfg.labels) frameworkInfo.Labels = &mesos.Labels{Labels: cfg.labels} } + if cfg.gpuClusterCompat { + frameworkInfo.Capabilities = append(frameworkInfo.Capabilities, + mesos.FrameworkInfo_Capability{Type: mesos.FrameworkInfo_Capability_GPU_RESOURCES}, + ) + } return frameworkInfo } From ae60d1fe3cf4eb2284808997828f2cecc59c6122 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 12:21:39 +0000 Subject: [PATCH 08/15] avoid breaking the FrameworkCaller API, introduce DynamicFrameworkCaller instead --- api/v1/cmd/example-scheduler/app/app.go | 2 +- api/v1/lib/scheduler/calls/caller.go | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index f3e74986..2090e75f 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -63,7 +63,7 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}), // automatically set the frameworkID for all outgoing calls calls.When(func() bool { return frameworkIDStore.Get() != "" }, - calls.FrameworkCaller(func() string { return frameworkIDStore.Get() })), + calls.DynamicFrameworkCaller(func() string { return frameworkIDStore.Get() })), }.Apply(state.cli) return controller.Config{ diff --git a/api/v1/lib/scheduler/calls/caller.go b/api/v1/lib/scheduler/calls/caller.go index 75502b94..82b96903 100644 --- a/api/v1/lib/scheduler/calls/caller.go +++ b/api/v1/lib/scheduler/calls/caller.go @@ -91,8 +91,18 @@ func (ds Decorators) Combine() (result Decorator) { return } -// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls (except SUBSCRIBE). -func FrameworkCaller(frameworkID func() string) Decorator { +// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls. +func FrameworkCaller(frameworkID string) Decorator { + return func(h Caller) Caller { + return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { + c.FrameworkID = &mesos.FrameworkID{Value: frameworkID} + return h.Call(c) + }) + } +} + +// DynamicFrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls (except SUBSCRIBE). +func DynamicFrameworkCaller(frameworkID func() string) Decorator { return func(h Caller) Caller { return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { // never overwrite framework ID for subscribe calls; the scheduler must do that part From 6efae19dd49e7103f1104472f2b2e1d82d0c5d32 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 15:39:14 +0000 Subject: [PATCH 09/15] address review feedback and make state.done goroutine-safe --- api/v1/cmd/example-scheduler/app/app.go | 33 +++++++++++-------- api/v1/cmd/example-scheduler/app/state.go | 20 ++++++++++- .../extras/scheduler/controller/controller.go | 7 ++-- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index 2090e75f..8a88a0e8 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -21,6 +21,12 @@ var ( RegistrationMaxBackoff = 15 * time.Second ) +// StateError is returned when the system encounters an unresolvable state transition error and +// should likely exit. +type StateError string + +func (err StateError) Error() string { return string(err) } + func Run(cfg Config) error { log.Printf("scheduler running with configuration: %+v", cfg) shutdown := make(chan struct{}) @@ -45,15 +51,19 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr var ( frameworkIDStore = NewInMemoryIDStore() controlContext = &controller.ContextAdapter{ - DoneFunc: func() bool { return state.done }, - // not called concurrently w/ subscription events, don't worry about using the atomic + DoneFunc: state.isDone, FrameworkIDFunc: func() string { return frameworkIDStore.Get() }, ErrorFunc: func(err error) { - if err != nil && err != io.EOF { - log.Println(err) - } else { - log.Println("disconnected") + if err != nil { + if err != io.EOF { + log.Println(err) + } + if _, ok := err.(StateError); ok { + state.markDone() + } + return } + log.Println("disconnected") }, } ) @@ -116,14 +126,11 @@ func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Ha frameworkID := e.GetSubscribed().GetFrameworkID().GetValue() if state.frameworkID == "" || state.frameworkID != frameworkID { if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint { - state.done = true // TODO(jdef) not goroutine safe - return errors.New("frameworkID changed unexpectedly; failover may be broken") + return StateError("frameworkID changed unexpectedly; failover may be broken") } state.frameworkID = frameworkID if state.frameworkID == "" { - // sanity check - state.done = true // TODO(jdef) not goroutine safe - return errors.New("mesos gave us an empty frameworkID") + return StateError("mesos gave us an empty frameworkID") } log.Println("FrameworkID", frameworkID) frameworkIDStore.Set(frameworkID) @@ -245,7 +252,7 @@ func statusUpdate(state *internalState, s mesos.TaskStatus) { if state.tasksFinished == state.totalTasks { log.Println("mission accomplished, terminating") - state.done = true + state.markDone() } else { tryReviveOffers(state) } @@ -256,7 +263,7 @@ func statusUpdate(state *internalState, s mesos.TaskStatus) { " with reason " + s.GetReason().String() + " from source " + s.GetSource().String() + " with message '" + s.GetMessage() + "'") - state.done = true + state.markDone() } } diff --git a/api/v1/cmd/example-scheduler/app/state.go b/api/v1/cmd/example-scheduler/app/state.go index 7610b994..fa484271 100644 --- a/api/v1/cmd/example-scheduler/app/state.go +++ b/api/v1/cmd/example-scheduler/app/state.go @@ -8,6 +8,7 @@ import ( "math/rand" "net/http" "os" + "sync" "time" proto "github.com/gogo/protobuf/proto" @@ -215,10 +216,26 @@ func newInternalState(cfg Config) (*internalState, error) { metricsAPI: metricsAPI, cli: buildHTTPSched(cfg, creds), random: rand.New(rand.NewSource(time.Now().Unix())), + done: make(chan struct{}), } return state, nil } +func (state *internalState) markDone() { + state.doneOnce.Do(func() { + close(state.done) + }) +} + +func (state *internalState) isDone() bool { + select { + case <-state.done: + return true + default: + return false + } +} + type internalState struct { tasksLaunched int tasksFinished int @@ -232,6 +249,7 @@ type internalState struct { reviveTokens <-chan struct{} metricsAPI *metricsAPI err error - done bool + done chan struct{} + doneOnce sync.Once random *rand.Rand } diff --git a/api/v1/lib/extras/scheduler/controller/controller.go b/api/v1/lib/extras/scheduler/controller/controller.go index 76d15ab3..4d026321 100644 --- a/api/v1/lib/extras/scheduler/controller/controller.go +++ b/api/v1/lib/extras/scheduler/controller/controller.go @@ -131,9 +131,10 @@ func (ca *ContextAdapter) Error(err error) { } } -type SchedulerError string +// ErrEvent errors are generated by the DefaultHandler upon receiving an ERROR event from Mesos. +type ErrEvent string -func (e SchedulerError) Error() string { +func (e ErrEvent) Error() string { return string(e) } @@ -143,7 +144,7 @@ func DefaultHandler(e *scheduler.Event) (err error) { // it's recommended that we abort and re-try subscribing; returning an // error here will cause the event loop to terminate and the connection // will be reset. - err = SchedulerError(e.GetError().GetMessage()) + err = ErrEvent(e.GetError().GetMessage()) } return } From d64ccfac49e21509ef551f738af9a08bb0358330 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 16 Apr 2017 23:10:01 +0000 Subject: [PATCH 10/15] s/DynamicFrameworkCaller/SubscribedCaller/ --- api/v1/cmd/example-scheduler/app/app.go | 2 +- api/v1/lib/scheduler/calls/caller.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index 8a88a0e8..b244c803 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -73,7 +73,7 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}), // automatically set the frameworkID for all outgoing calls calls.When(func() bool { return frameworkIDStore.Get() != "" }, - calls.DynamicFrameworkCaller(func() string { return frameworkIDStore.Get() })), + calls.SubscribedCaller(func() string { return frameworkIDStore.Get() })), }.Apply(state.cli) return controller.Config{ diff --git a/api/v1/lib/scheduler/calls/caller.go b/api/v1/lib/scheduler/calls/caller.go index 82b96903..b717c14d 100644 --- a/api/v1/lib/scheduler/calls/caller.go +++ b/api/v1/lib/scheduler/calls/caller.go @@ -92,6 +92,7 @@ func (ds Decorators) Combine() (result Decorator) { } // FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls. +// Deprecated in favor of SubscribedCaller; should remove after v0.0.3. func FrameworkCaller(frameworkID string) Decorator { return func(h Caller) Caller { return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { @@ -101,8 +102,8 @@ func FrameworkCaller(frameworkID string) Decorator { } } -// DynamicFrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls (except SUBSCRIBE). -func DynamicFrameworkCaller(frameworkID func() string) Decorator { +// SubscribedCaller generates and returns a Decorator that applies the given frameworkID to all calls (except SUBSCRIBE). +func SubscribedCaller(frameworkID func() string) Decorator { return func(h Caller) Caller { return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { // never overwrite framework ID for subscribe calls; the scheduler must do that part From 85a88416f91376bf63fac61b704fd411fd321c89 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 17 Apr 2017 01:07:07 +0000 Subject: [PATCH 11/15] address more review feedback (code cleanup) --- api/v1/cmd/example-scheduler/app/app.go | 21 ++++++++++++------- api/v1/cmd/example-scheduler/app/config.go | 2 +- .../extras/scheduler/controller/controller.go | 4 +--- api/v1/lib/scheduler/calls/calls.go | 17 +++++++++++++++ api/v1/lib/scheduler/options.go | 2 +- 5 files changed, 33 insertions(+), 13 deletions(-) diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index b244c803..9a6f4afe 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -2,6 +2,7 @@ package app import ( "errors" + "fmt" "io" "log" "strconv" @@ -124,16 +125,20 @@ func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Ha scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) error { log.Println("received a SUBSCRIBED event") frameworkID := e.GetSubscribed().GetFrameworkID().GetValue() - if state.frameworkID == "" || state.frameworkID != frameworkID { - if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint { - return StateError("frameworkID changed unexpectedly; failover may be broken") - } + // order of `if` statements are important: tread carefully w/ respect to future changes + if frameworkID == "" { + // sanity check, should **never** happen + return StateError("mesos gave us an empty frameworkID") + } + if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint { + return StateError(fmt.Sprintf( + "frameworkID changed unexpectedly; failover exceeded timeout? (%s).", + state.config.failoverTimeout)) + } + if state.frameworkID != frameworkID { state.frameworkID = frameworkID - if state.frameworkID == "" { - return StateError("mesos gave us an empty frameworkID") - } - log.Println("FrameworkID", frameworkID) frameworkIDStore.Set(frameworkID) + log.Println("FrameworkID", frameworkID) } return nil }, diff --git a/api/v1/cmd/example-scheduler/app/config.go b/api/v1/cmd/example-scheduler/app/config.go index b3269378..ecd7cbb9 100644 --- a/api/v1/cmd/example-scheduler/app/config.go +++ b/api/v1/cmd/example-scheduler/app/config.go @@ -88,7 +88,7 @@ func NewConfig() Config { url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"), codec: codec{Codec: &encoding.ProtobufCodec}, timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"), - failoverTimeout: envDuration("FRAMEWORK_FAILOVER_TIMEOUT", "1000h"), + failoverTimeout: envDuration("SCHEDULER_FAILOVER_TIMEOUT", "1000h"), checkpoint: true, server: server{address: env("LIBPROCESS_IP", "127.0.0.1")}, tasks: envInt("NUM_TASKS", "5"), diff --git a/api/v1/lib/extras/scheduler/controller/controller.go b/api/v1/lib/extras/scheduler/controller/controller.go index 4d026321..5dbe1a71 100644 --- a/api/v1/lib/extras/scheduler/controller/controller.go +++ b/api/v1/lib/extras/scheduler/controller/controller.go @@ -76,9 +76,7 @@ func (_ *controllerImpl) Run(config Config) (lastErr error) { for !config.Context.Done() { frameworkID := config.Context.FrameworkID() if config.Framework.GetFailoverTimeout() > 0 && frameworkID != "" { - frameworkProto := &mesos.FrameworkID{Value: frameworkID} - subscribe.Subscribe.FrameworkInfo.ID = frameworkProto - subscribe.FrameworkID = frameworkProto + subscribe.With(calls.SubscribeTo(frameworkID)) } <-config.RegistrationTokens resp, err := config.Caller.Call(subscribe) diff --git a/api/v1/lib/scheduler/calls/calls.go b/api/v1/lib/scheduler/calls/calls.go index c28d28f1..3a41b8b7 100644 --- a/api/v1/lib/scheduler/calls/calls.go +++ b/api/v1/lib/scheduler/calls/calls.go @@ -53,6 +53,23 @@ func Subscribe(info *mesos.FrameworkInfo) *scheduler.Call { } } +// SubscribeTo returns an option that configures a SUBSCRIBE call w/ a framework ID. +// If frameworkID is "" then the SUBSCRIBE call is cleared of all framework ID references. +// Panics if the call does not contain a non-nil Subscribe reference. +func SubscribeTo(frameworkID string) scheduler.CallOpt { + return func(call *scheduler.Call) { + if call.Subscribe == nil { + panic("illegal call option: Call.Subscribe was unexpectedly nil") + } + var frameworkProto *mesos.FrameworkID + if frameworkID != "" { + frameworkProto = &mesos.FrameworkID{Value: frameworkID} + } + call.Subscribe.FrameworkInfo.ID = frameworkProto + call.FrameworkID = frameworkProto + } +} + type acceptBuilder struct { offerIDs map[mesos.OfferID]struct{} operations []mesos.Offer_Operation diff --git a/api/v1/lib/scheduler/options.go b/api/v1/lib/scheduler/options.go index 4349b0d0..c35f8e19 100644 --- a/api/v1/lib/scheduler/options.go +++ b/api/v1/lib/scheduler/options.go @@ -11,7 +11,7 @@ func (c *Call) With(opts ...CallOpt) *Call { return c } -// A ReconcileOpt is a functional option type for Call_Reconcile +// ReconcileOpt is a functional option type for Call_Reconcile type ReconcileOpt func(*Call_Reconcile) // With applies the given ReconcileOpt's to the receiving Call_Reconcile, returning it. From 18db7e134d953c4d81400275810bf2dd67afb1b7 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 17 Apr 2017 11:59:56 +0000 Subject: [PATCH 12/15] httpcli: move APIError (and friends) to apierrors package; ErrorMapperFunc processes responses instead of status codes --- api/v1/lib/httpcli/apierrors/apierrors.go | 103 ++++++++++++++++++ api/v1/lib/httpcli/http.go | 122 +++++----------------- api/v1/lib/httpcli/httpsched/httpsched.go | 3 +- 3 files changed, 132 insertions(+), 96 deletions(-) create mode 100644 api/v1/lib/httpcli/apierrors/apierrors.go diff --git a/api/v1/lib/httpcli/apierrors/apierrors.go b/api/v1/lib/httpcli/apierrors/apierrors.go new file mode 100644 index 00000000..4d955849 --- /dev/null +++ b/api/v1/lib/httpcli/apierrors/apierrors.go @@ -0,0 +1,103 @@ +package apierrors + +import ( + "io/ioutil" + "net/http" +) + +var ( + // MsgNotLeader is returned by Do calls that are sent to a non leading Mesos master. Deprecated. + MsgNotLeader = "call sent to a non-leading master" + // MsgAuth is returned by Do calls that are not successfully authenticated. Deprecated. + MsgAuth = "call not authenticated" + // MsgUnsubscribed is returned by Do calls that are sent before a subscription is established. Deprecated. + MsgUnsubscribed = "no subscription established" + // MsgVersion is returned by Do calls that are sent to an incompatible API version. Deprecated. + MsgVersion = "incompatible API version" + // MsgMalformed is returned by Do calls that are malformed. Deprecated. + MsgMalformed = "malformed request" + // MsgMediaType is returned by Do calls that are sent with an unsupported media type. Deprecated. + MsgMediaType = "unsupported media type" + // MsgRateLimit is returned by Do calls that are rate limited. This is a temporary condition + // that should clear. Deprecated. + MsgRateLimit = "rate limited" + // MsgUnavailable is returned by Do calls that are sent to a master or agent that's in recovery, or + // does not yet realize that it's the leader. This is a temporary condition that should clear. Deprecated. + MsgUnavailable = "mesos server unavailable" + // MsgNotFound could happen if the master or agent libprocess has not yet set up http routes. Deprecated. + MsgNotFound = "mesos http endpoint not found" + + // ErrorTable maps HTTP response codes to their respective Mesos v1 API error messages. + ErrorTable = func() map[int]Error { + result := make(map[int]Error) + for code, msg := range map[int]string{ + http.StatusTemporaryRedirect: MsgNotLeader, + http.StatusBadRequest: MsgMalformed, + http.StatusConflict: MsgVersion, + http.StatusForbidden: MsgUnsubscribed, + http.StatusUnauthorized: MsgAuth, + http.StatusNotAcceptable: MsgMediaType, + http.StatusNotFound: MsgNotFound, + http.StatusServiceUnavailable: MsgUnavailable, + http.StatusTooManyRequests: MsgRateLimit, + } { + result[code] = Error{Code: code, Message: msg} + } + return result + }() +) + +// Error captures HTTP v1 API error codes and messages generated by Mesos. +type Error struct { + Code int // Code is the HTTP response status code generated by Mesos + Message string // Message briefly summarizes the nature of the error + Details string // Details captures the HTTP response entity, if any, supplied by Mesos +} + +// IsErrorCode returns true for all HTTP status codes that are not considered informational or successful. +func IsErrorCode(code int) bool { + return code >= 300 +} + +// FromResponse returns an `*Error` for a response containing a status code that indicates an error condition. +// The response body (if any) is captured in the Error.Details field. +// Returns nil for nil responses and responses with non-error status codes. +// See IsErrorCode. +func FromResponse(res *http.Response) error { + if res == nil { + return nil + } + + code := res.StatusCode + if !IsErrorCode(code) { + // non-error HTTP response codes don't generate errors + return nil + } + + details := "" + if res.Body != nil { + buf, _ := ioutil.ReadAll(res.Body) + details = string(buf) + } + + if msg, ok := ErrorTable[code]; ok { + // return a modified copy of whatever was in the error table + msg.Details = details + return &msg + } + + // unmapped errors are OK, they're just not "interpreted" (with a Message) + return &Error{ + Code: code, + Details: details, + } +} + +func (err *Error) Error() string { + return err.Message +} + +func IsErrNotLeader(err error) bool { + apiErr, ok := err.(*Error) + return ok && apiErr.Code == http.StatusTemporaryRedirect +} diff --git a/api/v1/lib/httpcli/http.go b/api/v1/lib/httpcli/http.go index 51125726..70e32e25 100644 --- a/api/v1/lib/httpcli/http.go +++ b/api/v1/lib/httpcli/http.go @@ -3,10 +3,8 @@ package httpcli import ( "bytes" "crypto/tls" - "errors" "fmt" "io" - "io/ioutil" "log" "net" "net/http" @@ -14,87 +12,20 @@ import ( "github.com/mesos/mesos-go/api/v1/lib" "github.com/mesos/mesos-go/api/v1/lib/encoding" + "github.com/mesos/mesos-go/api/v1/lib/httpcli/apierrors" "github.com/mesos/mesos-go/api/v1/lib/recordio" ) -// Deprecation notice: these error variables are no longer returned by this module. Use APIError instead. -// TODO(jdef) remove these error vars after v0.0.3 -var ( - // ErrNotLeader is returned by Do calls that are sent to a non leading Mesos master. Deprecated. - ErrNotLeader = errors.New("mesos: call sent to a non-leading master") - // ErrAuth is returned by Do calls that are not successfully authenticated. Deprecated. - ErrAuth = errors.New("mesos: call not authenticated") - // ErrUnsubscribed is returned by Do calls that are sent before a subscription is established. Deprecated. - ErrUnsubscribed = errors.New("mesos: no subscription established") - // ErrVersion is returned by Do calls that are sent to an incompatible API version. Deprecated. - ErrVersion = errors.New("mesos: incompatible API version") - // ErrMalformed is returned by Do calls that are malformed. Deprecated. - ErrMalformed = errors.New("mesos: malformed request") - // ErrMediaType is returned by Do calls that are sent with an unsupported media type. Deprecated. - ErrMediaType = errors.New("mesos: unsupported media type") - // ErrRateLimit is returned by Do calls that are rate limited. This is a temporary condition - // that should clear. Deprecated. - ErrRateLimit = errors.New("mesos: rate limited") - // ErrUnavailable is returned by Do calls that are sent to a master or agent that's in recovery, or - // does not yet realize that it's the leader. This is a temporary condition that should clear. Deprecated. - ErrUnavailable = errors.New("mesos: mesos server unavailable") - // ErrNotFound could happen if the master or agent libprocess has not yet set up http routes. Deprecated. - ErrNotFound = errors.New("mesos: mesos http endpoint not found") - - // codeErrors maps HTTP response codes to their respective errors. - codeErrors = map[int]error{ - http.StatusOK: nil, - http.StatusAccepted: nil, - http.StatusTemporaryRedirect: ErrNotLeader, - http.StatusBadRequest: ErrMalformed, - http.StatusConflict: ErrVersion, - http.StatusForbidden: ErrUnsubscribed, - http.StatusUnauthorized: ErrAuth, - http.StatusNotAcceptable: ErrMediaType, - http.StatusNotFound: ErrNotFound, - http.StatusServiceUnavailable: ErrUnavailable, - http.StatusTooManyRequests: ErrRateLimit, - } - - defaultErrorMapper = ErrorMapperFunc(func(code int) error { - // for now, just scrape the string of the deprecated error var and use that - // as the APIError.Message. Eventually we'll get rid of the Err* variables. - // TODO(jdef) simplify this after v0.0.3 - err, ok := codeErrors[code] - if !ok { - err = &APIError{Code: code} - } - return &APIError{ - Code: code, - Message: err.Error(), - } - }) -) - -// ProtocolError is a generic error type returned for expected status codes -// received from Mesos. -// Deprecated: no longer used in favor of APIError. -// TODO(jdef) remove this after v0.0.3 -type ProtocolError int +// ProtocolError is returned when we receive a response from Mesos that is outside of the HTTP API specification. +// Receipt of the following will yield protocol errors: +// - any unexpected non-error HTTP response codes (e.g. 199) +// - any unexpected Content-Type +type ProtocolError string // Error implements error interface -func (pe ProtocolError) Error() string { return fmt.Sprintf("Unexpected Mesos HTTP error: %d", int(pe)) } - -// APIError captures HTTP error codes and messages generated by Mesos. -type APIError struct { - Code int // Code is the HTTP response status code generated by Mesos - Message string // Message briefly summarizes the nature of the error - Details string // Details captures the HTTP response entity, if any, supplied by Mesos -} - -func (err *APIError) Error() string { - return err.Message -} +func (pe ProtocolError) Error() string { return string(pe) } -func IsErrNotLeader(err error) bool { - apiErr, ok := err.(*APIError) - return ok && apiErr.Code == http.StatusTemporaryRedirect -} +var defaultErrorMapper = ErrorMapperFunc(apierrors.FromResponse) const ( debug = false // TODO(jdef) kill me at some point @@ -124,16 +55,16 @@ type DoFunc func(*http.Request) (*http.Response, error) // Close when they're finished processing the response otherwise there may be connection leaks. type Response struct { io.Closer + Header http.Header + decoder encoding.Decoder - Header http.Header } // implements mesos.Response func (r *Response) Decoder() encoding.Decoder { return r.decoder } -// ErrorMapperFunc generates an error for the given statusCode. -// WARNING: this API will change in an upcoming release. -type ErrorMapperFunc func(statusCode int) error +// ErrorMapperFunc generates an error for the given response. +type ErrorMapperFunc func(*http.Response) error // ResponseHandler is invoked to process an HTTP response type ResponseHandler func(*http.Response, error) (mesos.Response, error) @@ -242,7 +173,14 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response, return nil, err } - var events encoding.Decoder + result := &Response{ + Closer: res.Body, + Header: res.Header, + } + if err = c.errorMapper(res); err != nil { + return result, err + } + switch res.StatusCode { case http.StatusOK: if debug { @@ -251,27 +189,21 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response, ct := res.Header.Get("Content-Type") if ct != c.codec.MediaTypes[indexResponseContentType] { res.Body.Close() - return nil, fmt.Errorf("unexpected content type: %q", ct) //TODO(jdef) extact this into a typed error + return nil, ProtocolError(fmt.Sprintf("unexpected content type: %q", ct)) } - events = c.codec.NewDecoder(recordio.NewFrameReader(res.Body)) + result.decoder = c.codec.NewDecoder(recordio.NewFrameReader(res.Body)) + case http.StatusAccepted: if debug { log.Println("request Accepted") } // noop; no decoder for these types of calls + default: - err = c.errorMapper(res.StatusCode) - if apiErr, ok := err.(*APIError); ok && res.Body != nil { - // Annotate the APIError with Details from the response - buf, _ := ioutil.ReadAll(res.Body) - apiErr.Details = string(buf) - } + return result, ProtocolError(fmt.Sprintf("unexpected mesos HTTP response code: %d", res.StatusCode)) } - return &Response{ - decoder: events, - Closer: res.Body, - Header: res.Header, - }, err + + return result, nil } // Do sends a Call and returns (a) a Response (should be closed when finished) that diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index 6affef9b..d09060c7 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -11,6 +11,7 @@ import ( "github.com/mesos/mesos-go/api/v1/lib/backoff" "github.com/mesos/mesos-go/api/v1/lib/encoding" "github.com/mesos/mesos-go/api/v1/lib/httpcli" + "github.com/mesos/mesos-go/api/v1/lib/httpcli/apierrors" "github.com/mesos/mesos-go/api/v1/lib/scheduler" "github.com/mesos/mesos-go/api/v1/lib/scheduler/calls" ) @@ -164,7 +165,7 @@ func (mre *mesosRedirectionError) Error() string { func (cli *client) redirectHandler() httpcli.Opt { return httpcli.HandleResponse(func(hres *http.Response, err error) (mesos.Response, error) { resp, err := cli.HandleResponse(hres, err) // default response handler - if err == nil || (err != nil && !httpcli.IsErrNotLeader(err)) { + if err == nil || (err != nil && !apierrors.IsErrNotLeader(err)) { return resp, err } res, ok := resp.(*httpcli.Response) From bd7142a2e8ed21f3d454266fbc24f967a9e296d7 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 17 Apr 2017 13:30:37 +0000 Subject: [PATCH 13/15] more error API cleanup --- api/v1/lib/httpcli/apierrors/apierrors.go | 92 +++++++++++------------ api/v1/lib/httpcli/httpsched/httpsched.go | 10 ++- 2 files changed, 53 insertions(+), 49 deletions(-) diff --git a/api/v1/lib/httpcli/apierrors/apierrors.go b/api/v1/lib/httpcli/apierrors/apierrors.go index 4d955849..2600bbb7 100644 --- a/api/v1/lib/httpcli/apierrors/apierrors.go +++ b/api/v1/lib/httpcli/apierrors/apierrors.go @@ -5,57 +5,64 @@ import ( "net/http" ) +// Code is a Mesos HTTP v1 API response status code +type Code int + var ( - // MsgNotLeader is returned by Do calls that are sent to a non leading Mesos master. Deprecated. + // MsgNotLeader is returned by Do calls that are sent to a non leading Mesos master. MsgNotLeader = "call sent to a non-leading master" - // MsgAuth is returned by Do calls that are not successfully authenticated. Deprecated. + // MsgAuth is returned by Do calls that are not successfully authenticated. MsgAuth = "call not authenticated" - // MsgUnsubscribed is returned by Do calls that are sent before a subscription is established. Deprecated. + // MsgUnsubscribed is returned by Do calls that are sent before a subscription is established. MsgUnsubscribed = "no subscription established" - // MsgVersion is returned by Do calls that are sent to an incompatible API version. Deprecated. + // MsgVersion is returned by Do calls that are sent to an incompatible API version. MsgVersion = "incompatible API version" - // MsgMalformed is returned by Do calls that are malformed. Deprecated. + // MsgMalformed is returned by Do calls that are malformed. MsgMalformed = "malformed request" - // MsgMediaType is returned by Do calls that are sent with an unsupported media type. Deprecated. + // MsgMediaType is returned by Do calls that are sent with an unsupported media type. MsgMediaType = "unsupported media type" // MsgRateLimit is returned by Do calls that are rate limited. This is a temporary condition - // that should clear. Deprecated. + // that should clear. MsgRateLimit = "rate limited" // MsgUnavailable is returned by Do calls that are sent to a master or agent that's in recovery, or - // does not yet realize that it's the leader. This is a temporary condition that should clear. Deprecated. + // does not yet realize that it's the leader. This is a temporary condition that should clear. MsgUnavailable = "mesos server unavailable" - // MsgNotFound could happen if the master or agent libprocess has not yet set up http routes. Deprecated. + // MsgNotFound could happen if the master or agent libprocess has not yet set up http routes. MsgNotFound = "mesos http endpoint not found" + CodeNotLeader = Code(http.StatusTemporaryRedirect) + CodeNotAuthenticated = Code(http.StatusUnauthorized) + CodeUnsubscribed = Code(http.StatusForbidden) + CodeIncompatibleVersion = Code(http.StatusConflict) + CodeMalformedRequest = Code(http.StatusBadRequest) + CodeUnsupportedMediaType = Code(http.StatusNotAcceptable) + CodeRateLimitExceded = Code(http.StatusTooManyRequests) + CodeMesosUnavailable = Code(http.StatusServiceUnavailable) + CodeNotFound = Code(http.StatusNotFound) + // ErrorTable maps HTTP response codes to their respective Mesos v1 API error messages. - ErrorTable = func() map[int]Error { - result := make(map[int]Error) - for code, msg := range map[int]string{ - http.StatusTemporaryRedirect: MsgNotLeader, - http.StatusBadRequest: MsgMalformed, - http.StatusConflict: MsgVersion, - http.StatusForbidden: MsgUnsubscribed, - http.StatusUnauthorized: MsgAuth, - http.StatusNotAcceptable: MsgMediaType, - http.StatusNotFound: MsgNotFound, - http.StatusServiceUnavailable: MsgUnavailable, - http.StatusTooManyRequests: MsgRateLimit, - } { - result[code] = Error{Code: code, Message: msg} - } - return result - }() + ErrorTable = map[Code]string{ + CodeNotLeader: MsgNotLeader, + CodeMalformedRequest: MsgMalformed, + CodeIncompatibleVersion: MsgVersion, + CodeUnsubscribed: MsgUnsubscribed, + CodeNotAuthenticated: MsgAuth, + CodeUnsupportedMediaType: MsgMediaType, + CodeNotFound: MsgNotFound, + CodeMesosUnavailable: MsgUnavailable, + CodeRateLimitExceded: MsgRateLimit, + } ) // Error captures HTTP v1 API error codes and messages generated by Mesos. type Error struct { - Code int // Code is the HTTP response status code generated by Mesos + Code Code // Code is the HTTP response status code generated by Mesos Message string // Message briefly summarizes the nature of the error Details string // Details captures the HTTP response entity, if any, supplied by Mesos } // IsErrorCode returns true for all HTTP status codes that are not considered informational or successful. -func IsErrorCode(code int) bool { +func (code Code) IsError() bool { return code >= 300 } @@ -68,36 +75,25 @@ func FromResponse(res *http.Response) error { return nil } - code := res.StatusCode - if !IsErrorCode(code) { + code := Code(res.StatusCode) + if !code.IsError() { // non-error HTTP response codes don't generate errors return nil } - details := "" + err := &Error{Code: code} + if res.Body != nil { + defer res.Body.Close() buf, _ := ioutil.ReadAll(res.Body) - details = string(buf) + err.Details = string(buf) } - if msg, ok := ErrorTable[code]; ok { - // return a modified copy of whatever was in the error table - msg.Details = details - return &msg - } - - // unmapped errors are OK, they're just not "interpreted" (with a Message) - return &Error{ - Code: code, - Details: details, - } + err.Message = ErrorTable[code] + return err } +// Error implements error interface func (err *Error) Error() string { return err.Message } - -func IsErrNotLeader(err error) bool { - apiErr, ok := err.(*Error) - return ok && apiErr.Code == http.StatusTemporaryRedirect -} diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index d09060c7..bef1e337 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -158,6 +158,14 @@ func (mre *mesosRedirectionError) Error() string { return "mesos server sent redirect to: " + mre.newURL } +func isErrNotLeader(err error) bool { + if err == nil { + return false + } + apiErr, ok := err.(*apierrors.Error) + return ok && apiErr.Code == apierrors.CodeNotLeader +} + // redirectHandler returns a config options that decorates the default response handling routine; // it transforms normal Mesos redirect "errors" into mesosRedirectionErrors by parsing the Location // header and computing the address of the next endpoint that should be used to replay the failed @@ -165,7 +173,7 @@ func (mre *mesosRedirectionError) Error() string { func (cli *client) redirectHandler() httpcli.Opt { return httpcli.HandleResponse(func(hres *http.Response, err error) (mesos.Response, error) { resp, err := cli.HandleResponse(hres, err) // default response handler - if err == nil || (err != nil && !apierrors.IsErrNotLeader(err)) { + if err == nil || !isErrNotLeader(err) { return resp, err } res, ok := resp.(*httpcli.Response) From 5df42ce5898e87129139ac94f2abc42125a07e3d Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 17 Apr 2017 14:45:33 +0000 Subject: [PATCH 14/15] transition to disconnected upon scheduler ERROR events --- api/v1/lib/httpcli/httpsched/state.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index bfb882e2..510f4197 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -2,6 +2,7 @@ package httpsched import ( "errors" + "fmt" "log" "net/http" "sync" @@ -106,6 +107,13 @@ func disconnectedFn(state *state) stateFn { return disconnectedFn } + transitionToDisconnected := func() { + state.m.Lock() + defer state.m.Unlock() + state.fn = disconnectedFn + _ = stateResp.Close() // swallow any error here + } + // wrap the response: any errors processing the subscription stream should result in a // transition to a disconnected state ASAP. state.resp = &mesos.ResponseWrapper{ @@ -115,10 +123,17 @@ func disconnectedFn(state *state) stateFn { return func(u encoding.Unmarshaler) (err error) { err = decoder(u) if err != nil { - state.m.Lock() - defer state.m.Unlock() - state.fn = disconnectedFn - _ = stateResp.Close() // swallow any error here + transitionToDisconnected() + return + } + switch e := u.(type) { + case (*scheduler.Event): + if e.GetType() == scheduler.Event_ERROR { + transitionToDisconnected() + } + default: + err = httpcli.ProtocolError( + fmt.Sprintf("unexpected object on subscription event stream", e)) } return } From 7fd41ca0fdec858b954d12de3101b300c28b9c4f Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 17 Apr 2017 16:26:53 +0000 Subject: [PATCH 15/15] introduce DisconnectionDetector and CodeIndicatingSubscriptionLoss --- api/v1/lib/httpcli/httpsched/httpsched.go | 10 +- api/v1/lib/httpcli/httpsched/state.go | 119 ++++++++++++++++------ 2 files changed, 95 insertions(+), 34 deletions(-) diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index bef1e337..0f53d905 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -1,7 +1,6 @@ package httpsched import ( - "errors" "log" "net/http" "net/url" @@ -17,13 +16,13 @@ import ( ) var ( - errNotHTTP = errors.New("expected an HTTP object, found something else instead") - errBadLocation = errors.New("failed to build new Mesos service endpoint URL from Location header") + errNotHTTPCli = httpcli.ProtocolError("expected an httpcli.Response object, found something else instead") + errBadLocation = httpcli.ProtocolError("failed to build new Mesos service endpoint URL from Location header") DefaultRedirectSettings = RedirectSettings{ MaxAttempts: 9, MaxBackoffPeriod: 13 * time.Second, - MinBackoffPeriod: 100 * time.Millisecond, + MinBackoffPeriod: 500 * time.Millisecond, } ) @@ -176,12 +175,13 @@ func (cli *client) redirectHandler() httpcli.Opt { if err == nil || !isErrNotLeader(err) { return resp, err } + // TODO(jdef) for now, we're tightly coupled to the httpcli package's Response type res, ok := resp.(*httpcli.Response) if !ok { if resp != nil { resp.Close() } - return nil, errNotHTTP + return nil, errNotHTTPCli } log.Println("master changed?") location, ok := buildNewEndpoint(res.Header.Get("Location"), cli.Endpoint()) diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 510f4197..1339434d 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -1,7 +1,6 @@ package httpsched import ( - "errors" "fmt" "log" "net/http" @@ -10,6 +9,7 @@ import ( "github.com/mesos/mesos-go/api/v1/lib" "github.com/mesos/mesos-go/api/v1/lib/encoding" "github.com/mesos/mesos-go/api/v1/lib/httpcli" + "github.com/mesos/mesos-go/api/v1/lib/httpcli/apierrors" "github.com/mesos/mesos-go/api/v1/lib/scheduler" "github.com/mesos/mesos-go/api/v1/lib/scheduler/calls" ) @@ -19,10 +19,13 @@ const ( debug = false ) +type StateError string + +func (err StateError) Error() string { return string(err) } + var ( - errMissingMesosStreamId = errors.New("missing Mesos-Stream-Id header expected with successful SUBSCRIBE") - errAlreadySubscribed = errors.New("already subscribed, cannot re-issue a SUBSCRIBE call") - errNotSubscribed = errors.New("not yet subscribed, must first issue a SUBSCRIBE call") + errMissingMesosStreamId = httpcli.ProtocolError("missing Mesos-Stream-Id header expected with successful SUBSCRIBE") + errAlreadySubscribed = StateError("already subscribed, cannot re-issue a SUBSCRIBE call") ) type ( @@ -59,11 +62,54 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc { return f } +// DisconnectionDetector is a programmable response decorator that attempts to detect errors +// that should transition the state from "connected" to "disconnected". Detector implementations +// are expected to invoke the `disconnect` callback in order to initiate the disconnection. +// +// The default implementation will transition to a disconnected state when: +// - an error occurs while decoding an object from the subscription stream +// - mesos reports an ERROR-type scheduler.Event object via the subscription stream +// - an object on the stream does not decode to a *scheduler.Event (sanity check) +// +// Consumers of this package may choose to override default behavior by overwriting the default +// value of this var, but should exercise caution: failure to properly transition to a disconnected +// state may cause subsequent Call operations to fail (without recourse). +var DisconnectionDetector = func(resp mesos.Response, disconnect func()) mesos.Response { + return &mesos.ResponseWrapper{ + Response: resp, + DecoderFunc: func() encoding.Decoder { + decoder := resp.Decoder() + return func(u encoding.Unmarshaler) (err error) { + err = decoder(u) + if err != nil { + disconnect() + return + } + switch e := u.(type) { + case (*scheduler.Event): + if e.GetType() == scheduler.Event_ERROR { + // the mesos scheduler API recommends that scheduler implementations + // resubscribe in this case. we initiate the disconnection here because + // it is assumed to be convenient for most framework implementations. + disconnect() + } + default: + // sanity check: this should never happen in practice. + err = httpcli.ProtocolError( + fmt.Sprintf("unexpected object on subscription event stream: %v", e)) + disconnect() + } + return + } + }, + } +} + func disconnectedFn(state *state) stateFn { // (a) validate call = SUBSCRIBE if state.call.GetType() != scheduler.Call_SUBSCRIBE { state.resp = nil - state.err = errNotSubscribed + state.err = &apierrors.Error{Code: apierrors.CodeUnsubscribed} return disconnectedFn } @@ -116,29 +162,7 @@ func disconnectedFn(state *state) stateFn { // wrap the response: any errors processing the subscription stream should result in a // transition to a disconnected state ASAP. - state.resp = &mesos.ResponseWrapper{ - Response: stateResp, - DecoderFunc: func() encoding.Decoder { - decoder := stateResp.Decoder() - return func(u encoding.Unmarshaler) (err error) { - err = decoder(u) - if err != nil { - transitionToDisconnected() - return - } - switch e := u.(type) { - case (*scheduler.Event): - if e.GetType() == scheduler.Event_ERROR { - transitionToDisconnected() - } - default: - err = httpcli.ProtocolError( - fmt.Sprintf("unexpected object on subscription event stream", e)) - } - return - } - }, - } + state.resp = DisconnectionDetector(stateResp, transitionToDisconnected) // (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed state.caller = &callerTemporary{ @@ -148,10 +172,42 @@ func disconnectedFn(state *state) stateFn { return connectedFn } +// CodesIndicatingSubscriptionLoss is a set of apierror.Code entries which each indicate that +// the event subscription stream has been severed between the scheduler and mesos. It's respresented +// as a public map variable so that clients can program additional error codes (if such are discovered) +// without hacking the code of the mesos-go library directly. +var CodesIndicatingSubscriptionLoss = func(codes ...apierrors.Code) map[apierrors.Code]struct{} { + result := make(map[apierrors.Code]struct{}, len(codes)) + for _, code := range codes { + result[code] = struct{}{} + } + return result +}( + // expand this list as we discover other errors that guarantee we've lost our event subscription. + apierrors.CodeUnsubscribed, +) + +func errorIndicatesSubscriptionLoss(err error) (result bool) { + if apiError, ok := err.(*apierrors.Error); ok { + _, result = CodesIndicatingSubscriptionLoss[apiError.Code] + } + // TODO(jdef) should other error types be considered here as well? + return +} + func connectedFn(state *state) stateFn { // (a) validate call != SUBSCRIBE if state.call.GetType() == scheduler.Call_SUBSCRIBE { state.resp = nil + + // TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue + // redundant subscribe calls. However, the state tracking mechanism in this module can't + // cope with it (e.g. we'll need to track a new stream-id, etc). + // We make a best effort to transition to a disconnected state if we detect protocol errors, + // error events, or mesos-generated "not subscribed" errors. But we don't handle things such + // as, for example, authentication errors. Granted, the initial subscribe call should fail + // if authentication is an issue, so we should never end up here. I'm not convinced there's + // not other edge cases though with respect to other error codes. state.err = errAlreadySubscribed return connectedFn } @@ -159,7 +215,12 @@ func connectedFn(state *state) stateFn { // (b) execute call, save the result in resp, err state.resp, state.err = state.caller.Call(state.call) - // stay connected, don't attempt to interpret errors here + if errorIndicatesSubscriptionLoss(state.err) { + // properly transition back to a disconnected state if mesos thinks that we're unsubscribed + return disconnectedFn + } + + // stay connected, don't attempt to interpret other errors here return connectedFn }