Skip to content

Commit

Permalink
proxy: fix goroutine leaks in tests (#2170)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Yastrebov <[email protected]>

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov authored Dec 20, 2022
1 parent 211b0ec commit 13c1b5e
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 51 deletions.
4 changes: 3 additions & 1 deletion net/redisclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,9 @@ func (r *RedisRingClient) Close() {
r.once.Do(func() {
r.closed = true
close(r.quit)
r.ring.Close()
if r.ring != nil {
r.ring.Close()
}
})
}

Expand Down
5 changes: 5 additions & 0 deletions proxy/backendtimeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestSlowService(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusGatewayTimeout {
t.Errorf("expected 504, got: %v", rsp)
Expand All @@ -60,6 +61,7 @@ func TestFastService(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
Expand Down Expand Up @@ -94,6 +96,7 @@ func TestBackendTimeoutInTheMiddleOfServiceResponse(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
Expand Down Expand Up @@ -166,6 +169,7 @@ func TestRetryAndSlowService(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusGatewayTimeout {
t.Errorf("expected 504, got: %v", rsp)
Expand Down Expand Up @@ -196,6 +200,7 @@ func TestRetryAndFastService(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
t.Errorf("expected 200, got: %v", rsp)
Expand Down
38 changes: 30 additions & 8 deletions proxy/fadeintesting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (

type fadeInDataClient struct {
reset, update chan []*eskip.Route
quit chan struct{}
}

type fadeInBackendInstance struct {
Expand All @@ -56,8 +57,9 @@ type fadeInBackend struct {
}

type fadeInProxyInstance struct {
proxy *proxy.Proxy
server *httptest.Server
routing *routing.Routing
proxy *proxy.Proxy
server *httptest.Server
}

type fadeInProxy struct {
Expand Down Expand Up @@ -107,18 +109,33 @@ func createDataClient(r ...*eskip.Route) fadeInDataClient {
var c fadeInDataClient
c.reset = make(chan []*eskip.Route, 1)
c.update = make(chan []*eskip.Route, 1)
c.quit = make(chan struct{})
c.reset <- r
return c
}

func (c fadeInDataClient) LoadAll() ([]*eskip.Route, error) {
r := <-c.reset
c.reset <- r
return r, nil
select {
case r := <-c.reset:
c.reset <- r
return r, nil
case <-c.quit:
return nil, nil
}
}

func (c fadeInDataClient) LoadUpdate() ([]*eskip.Route, []string, error) {
return <-c.update, nil, nil
select {
// hm, blocking dataclient?
case r := <-c.update:
return r, nil, nil
case <-c.quit:
return nil, nil, nil
}
}

func (c fadeInDataClient) close() {
close(c.quit)
}

// startBackend starts a backend representing 0 or more endpoints, added in a separate step.
Expand Down Expand Up @@ -219,11 +236,15 @@ func (b *fadeInBackend) close() {
for _, i := range b.instances {
i.server.Close()
}
for _, c := range b.clients {
c.close()
}
}

func (p *fadeInProxyInstance) close() {
p.server.Close()
p.proxy.Close()
p.routing.Close()
}

// startProxy starts a proxy representing 0 or more proxy instances, added in a separate step.
Expand Down Expand Up @@ -259,8 +280,9 @@ func (p *fadeInProxy) addInstances(n int) {

s := httptest.NewServer(px)
p.instances = append(p.instances, &fadeInProxyInstance{
proxy: px,
server: s,
routing: rt,
proxy: px,
server: s,
})
}
}
Expand Down
11 changes: 6 additions & 5 deletions proxy/longpoll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func expectSuccessfulResponse(t *testing.T, rsp roundTripResponse) {
}

t.Errorf("failed to make request: %v", rsp.err)
rsp.response.Body.Close()
}

func expectErrorResponse(t *testing.T, rsp roundTripResponse) {
Expand Down Expand Up @@ -250,6 +249,8 @@ func testCancelDuringStreamingRequestBody(t *testing.T, p initProxy) {
cancelRequest()
select {
case <-backendDetectedRequestCancel:
body.Close() // prevent Transport.RoundTrip hang on reading request
expectErrorResponse(t, <-responseReceived)
case <-time.After(detectionTimeout):
t.Error("failed to detect canceled request on time")

Expand Down Expand Up @@ -377,10 +378,10 @@ func testCancelDuringStreamingResponseBody(t *testing.T, p initProxy) {

func TestNotifyBackendOnClosedClient(t *testing.T) {
scenarios := map[string]testScenario{
"before response received": testCancelBeforeResponseReceived,
"during streaming request body": testCancelDuringStreamingRequestBody,
"after response received": testCancelAfterResponseReceived,
"duing streaming response body": testCancelDuringStreamingResponseBody,
"before response received": testCancelBeforeResponseReceived,
"during streaming request body": testCancelDuringStreamingRequestBody,
"after response received": testCancelAfterResponseReceived,
"during streaming response body": testCancelDuringStreamingResponseBody,
}

proxyVariants := map[string]initProxy{
Expand Down
1 change: 1 addition & 0 deletions proxy/loopback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func testLoopback(

w.Header().Set("X-Backend-Done", "true")
}))
defer backend.Close()

routes = strings.ReplaceAll(routes, "$backend", backend.URL)

Expand Down
12 changes: 12 additions & 0 deletions proxy/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package proxy

import (
"os"
"testing"

"github.com/AlexanderYastrebov/noleak"
)

func TestMain(m *testing.M) {
os.Exit(noleak.CheckMain(m))
}
28 changes: 15 additions & 13 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type syncResponseWriter struct {

type testProxy struct {
log *loggingtest.Logger
dc *testdataclient.Client
routing *routing.Routing
proxy *Proxy
}
Expand Down Expand Up @@ -162,7 +163,7 @@ func newTestProxyWithFiltersAndParams(fr filters.Registry, doc string, params Pa
return nil, err
}

return &testProxy{tl, rt, p}, nil
return &testProxy{tl, dc, rt, p}, nil
}

func newTestProxyWithFilters(fr filters.Registry, doc string, flags Flags, pr ...PriorityRoute) (*testProxy, error) {
Expand All @@ -183,6 +184,7 @@ func newTestProxy(doc string, flags Flags, pr ...PriorityRoute) (*testProxy, err

func (tp *testProxy) close() {
tp.log.Close()
tp.dc.Close()
tp.routing.Close()
tp.proxy.Close()
}
Expand Down Expand Up @@ -1073,7 +1075,7 @@ func TestProcessesRequestWithPriorityRouteOverStandard(t *testing.T) {
s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Test-Header", "normal-value")
}))
defer s0.Close()
defer s1.Close()

req, err := http.NewRequest(
"GET",
Expand Down Expand Up @@ -1129,6 +1131,7 @@ func TestFlusherImplementation(t *testing.T) {

a := fmt.Sprintf(":%d", 1<<16-rand.Intn(1<<15))
ps := &http.Server{Addr: a, Handler: tp.proxy}
defer ps.Close()
go ps.ListenAndServe()

// let the server start listening
Expand Down Expand Up @@ -1339,6 +1342,7 @@ func TestHostHeader(t *testing.T) {
closeAll()
continue
}
defer rsp.Body.Close()

if ti.flags.Debug() {
closeAll()
Expand All @@ -1362,8 +1366,7 @@ func TestBackendServiceUnavailable(t *testing.T) {
t.Error(err)
return
}

defer p.proxy.Close()
defer p.close()

ps := httptest.NewServer(p.proxy)
defer ps.Close()
Expand Down Expand Up @@ -1471,7 +1474,7 @@ func TestResponseHeaderTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer p.proxy.Close()
defer p.close()

ps := httptest.NewServer(p.proxy)
defer ps.Close()
Expand Down Expand Up @@ -1554,8 +1557,7 @@ func TestBranding(t *testing.T) {
t.Error(err)
return
}

defer p.proxy.Close()
defer p.close()

ps := httptest.NewServer(p.proxy)
defer ps.Close()
Expand Down Expand Up @@ -1618,8 +1620,7 @@ func TestFixNoAppLogFor404(t *testing.T) {
t.Error(err)
return
}

defer p.proxy.Close()
defer p.close()

ps := httptest.NewServer(p.proxy)
defer ps.Close()
Expand Down Expand Up @@ -1677,14 +1678,14 @@ func TestRequestContentHeaders(t *testing.T) {
return
}
}))
defer backend.Close()

p, err := newTestProxy(fmt.Sprintf(`* -> "%s"`, backend.URL), FlagsNone)
if err != nil {
t.Error(err)
return
}

defer p.proxy.Close()
defer p.close()

ps := httptest.NewServer(p.proxy)
defer ps.Close()
Expand Down Expand Up @@ -1727,12 +1728,14 @@ func TestSettingDefaultHTTPStatus(t *testing.T) {
DefaultHTTPStatus: http.StatusBadGateway,
}
p := WithParams(params)
p.Close()
if p.defaultHTTPStatus != http.StatusBadGateway {
t.Errorf("expected default HTTP status %d, got %d", http.StatusBadGateway, p.defaultHTTPStatus)
}

params.DefaultHTTPStatus = http.StatusNetworkAuthenticationRequired + 1
p = WithParams(params)
p.Close()
if p.defaultHTTPStatus != http.StatusNotFound {
t.Errorf("expected default HTTP status %d, got %d", http.StatusNotFound, p.defaultHTTPStatus)
}
Expand Down Expand Up @@ -1989,8 +1992,7 @@ func TestAccessLogOnFailedRequest(t *testing.T) {
t.Error(err)
return
}

defer p.proxy.Close()
defer p.close()

ps := httptest.NewServer(p.proxy)
defer ps.Close()
Expand Down
Loading

0 comments on commit 13c1b5e

Please sign in to comment.