diff --git a/toxics/debug.test b/toxics/debug.test new file mode 100644 index 00000000..b38851a6 Binary files /dev/null and b/toxics/debug.test differ diff --git a/toxics/limit_time.go b/toxics/limit_time.go new file mode 100644 index 00000000..b252bf84 --- /dev/null +++ b/toxics/limit_time.go @@ -0,0 +1,48 @@ +package toxics + +import "time" + +// LimitTimeToxic has shuts connection after given time +type LimitTimeToxic struct { + Time int64 `json:"time"` +} + +type LimitTimeToxicState struct { + ElapsedMilliseconds int64 +} + +func (t *LimitTimeToxic) Pipe(stub *ToxicStub) { + state := stub.State.(*LimitTimeToxicState) + + if state.ElapsedMilliseconds >= t.Time { + stub.Close() + return + } + + timeout := time.Duration(t.Time-state.ElapsedMilliseconds) * time.Millisecond + start := time.Now() + for { + select { + case <-time.After(timeout): + stub.Close() + return + case <-stub.Interrupt: + state.ElapsedMilliseconds = int64(time.Since(start) / time.Millisecond) + return + case c := <-stub.Input: + if c == nil { + stub.Close() + return + } + stub.Output <- c + } + } +} + +func (t *LimitTimeToxic) NewState() interface{} { + return new(LimitTimeToxicState) +} + +func init() { + Register("limit_time", new(LimitTimeToxic)) +} diff --git a/toxics/limit_time_test.go b/toxics/limit_time_test.go new file mode 100644 index 00000000..7b5c7a20 --- /dev/null +++ b/toxics/limit_time_test.go @@ -0,0 +1,96 @@ +package toxics_test + +import ( + "bytes" + "testing" + "time" + + "github.com/Shopify/toxiproxy/stream" + "github.com/Shopify/toxiproxy/toxics" +) + +func TestLimitTimeToxicContinuesAfterInterrupt(t *testing.T) { + timeout := int64(1000) + toxic := &toxics.LimitTimeToxic{Time: timeout} + + input := make(chan *stream.StreamChunk) + output := make(chan *stream.StreamChunk) + stub := toxics.NewToxicStub(input, output) + stub.State = toxic.NewState() + + // Wait for half the timeout and interrupt + go func() { + time.Sleep(time.Duration(timeout/2) * time.Millisecond) + stub.Interrupt <- struct{}{} + }() + + start := time.Now() + toxic.Pipe(stub) + elapsed1 := time.Since(start) + if int64(elapsed1/time.Millisecond) >= timeout { + t.Error("Interrupt did not immediately return from pipe") + } + + // Without sending anything then pipe should wait for remainder of timeout and close stub + toxic.Pipe(stub) + elapsedTotal := time.Since(start) + + if int64(elapsedTotal/time.Millisecond) > int64((float64(timeout) * 1.1)) { + t.Error("Timeout started again after interrupt") + } + + if int64(elapsedTotal/time.Millisecond) < timeout { + t.Error("Did not wait for timeout to elapse") + } + + if !stub.Closed() { + t.Error("Did not close pipe after timeout") + } +} + +func TestLimitTimeToxicNilInputShouldClosePipe(t *testing.T) { + timeout := int64(30000) + toxic := &toxics.LimitTimeToxic{Time: timeout} + + input := make(chan *stream.StreamChunk) + output := make(chan *stream.StreamChunk) + stub := toxics.NewToxicStub(input, output) + stub.State = toxic.NewState() + + go func() { + input <- nil + }() + + start := time.Now() + toxic.Pipe(stub) + elapsed1 := time.Since(start) + if int64(elapsed1/time.Millisecond) >= timeout { + t.Error("Did not immediately close pipe") + } + + if !stub.Closed() { + t.Error("Did not close pipe") + } + +} + +func TestLimitTimeToxicSendsDataThroughBeforeTimeoutReached(t *testing.T) { + timeout := int64(30000) + toxic := &toxics.LimitTimeToxic{Time: timeout} + + input := make(chan *stream.StreamChunk) + output := make(chan *stream.StreamChunk) + stub := toxics.NewToxicStub(input, output) + stub.State = toxic.NewState() + + go toxic.Pipe(stub) + + inputBuffer := buffer(100) + input <- &stream.StreamChunk{Data: inputBuffer} + + sentData := <-output + + if !bytes.Equal(sentData.Data, inputBuffer) { + t.Error("Data did not get sent through") + } +}