diff --git a/net/redisclient.go b/net/redisclient.go index 09c83eafb7..dc5420bacb 100644 --- a/net/redisclient.go +++ b/net/redisclient.go @@ -359,7 +359,9 @@ func (r *RedisRingClient) Close() { r.once.Do(func() { r.closed = true close(r.quit) - r.ring.Close() + if r.ring != nil { + r.ring.Close() + } }) } diff --git a/proxy/backendtimeout_test.go b/proxy/backendtimeout_test.go index 48cbec4f49..83734688ba 100644 --- a/proxy/backendtimeout_test.go +++ b/proxy/backendtimeout_test.go @@ -34,6 +34,7 @@ func TestSlowService(t *testing.T) { if err != nil { t.Fatal(err) } + defer rsp.Body.Close() if rsp.StatusCode != http.StatusGatewayTimeout { t.Errorf("expected 504, got: %v", rsp) @@ -60,6 +61,7 @@ func TestFastService(t *testing.T) { if err != nil { t.Fatal(err) } + defer rsp.Body.Close() if rsp.StatusCode != http.StatusOK { t.Errorf("expected 200, got: %v", rsp) @@ -94,6 +96,7 @@ func TestBackendTimeoutInTheMiddleOfServiceResponse(t *testing.T) { if err != nil { t.Fatal(err) } + defer rsp.Body.Close() if rsp.StatusCode != http.StatusOK { t.Errorf("expected 200, got: %v", rsp) @@ -166,6 +169,7 @@ func TestRetryAndSlowService(t *testing.T) { if err != nil { t.Fatal(err) } + defer rsp.Body.Close() if rsp.StatusCode != http.StatusGatewayTimeout { t.Errorf("expected 504, got: %v", rsp) @@ -196,6 +200,7 @@ func TestRetryAndFastService(t *testing.T) { if err != nil { t.Fatal(err) } + defer rsp.Body.Close() if rsp.StatusCode != http.StatusOK { t.Errorf("expected 200, got: %v", rsp) diff --git a/proxy/fadeintesting_test.go b/proxy/fadeintesting_test.go index cc6b3073b7..4c3f6f7373 100644 --- a/proxy/fadeintesting_test.go +++ b/proxy/fadeintesting_test.go @@ -40,6 +40,7 @@ const ( type fadeInDataClient struct { reset, update chan []*eskip.Route + quit chan struct{} } type fadeInBackendInstance struct { @@ -56,8 +57,9 @@ type fadeInBackend struct { } type fadeInProxyInstance struct { - proxy *proxy.Proxy - server *httptest.Server + routing *routing.Routing + proxy *proxy.Proxy + server *httptest.Server } type fadeInProxy struct { @@ -107,18 +109,33 @@ func createDataClient(r ...*eskip.Route) fadeInDataClient { var c fadeInDataClient c.reset = make(chan []*eskip.Route, 1) c.update = make(chan []*eskip.Route, 1) + c.quit = make(chan struct{}) c.reset <- r return c } func (c fadeInDataClient) LoadAll() ([]*eskip.Route, error) { - r := <-c.reset - c.reset <- r - return r, nil + select { + case r := <-c.reset: + c.reset <- r + return r, nil + case <-c.quit: + return nil, nil + } } func (c fadeInDataClient) LoadUpdate() ([]*eskip.Route, []string, error) { - return <-c.update, nil, nil + select { + // hm, blocking dataclient? + case r := <-c.update: + return r, nil, nil + case <-c.quit: + return nil, nil, nil + } +} + +func (c fadeInDataClient) close() { + close(c.quit) } // startBackend starts a backend representing 0 or more endpoints, added in a separate step. @@ -219,11 +236,15 @@ func (b *fadeInBackend) close() { for _, i := range b.instances { i.server.Close() } + for _, c := range b.clients { + c.close() + } } func (p *fadeInProxyInstance) close() { p.server.Close() p.proxy.Close() + p.routing.Close() } // startProxy starts a proxy representing 0 or more proxy instances, added in a separate step. @@ -259,8 +280,9 @@ func (p *fadeInProxy) addInstances(n int) { s := httptest.NewServer(px) p.instances = append(p.instances, &fadeInProxyInstance{ - proxy: px, - server: s, + routing: rt, + proxy: px, + server: s, }) } } diff --git a/proxy/longpoll_test.go b/proxy/longpoll_test.go index 48dc5a573d..f5f8d9a80e 100644 --- a/proxy/longpoll_test.go +++ b/proxy/longpoll_test.go @@ -148,7 +148,6 @@ func expectSuccessfulResponse(t *testing.T, rsp roundTripResponse) { } t.Errorf("failed to make request: %v", rsp.err) - rsp.response.Body.Close() } func expectErrorResponse(t *testing.T, rsp roundTripResponse) { @@ -250,6 +249,8 @@ func testCancelDuringStreamingRequestBody(t *testing.T, p initProxy) { cancelRequest() select { case <-backendDetectedRequestCancel: + body.Close() // prevent Transport.RoundTrip hang on reading request + expectErrorResponse(t, <-responseReceived) case <-time.After(detectionTimeout): t.Error("failed to detect canceled request on time") @@ -377,10 +378,10 @@ func testCancelDuringStreamingResponseBody(t *testing.T, p initProxy) { func TestNotifyBackendOnClosedClient(t *testing.T) { scenarios := map[string]testScenario{ - "before response received": testCancelBeforeResponseReceived, - "during streaming request body": testCancelDuringStreamingRequestBody, - "after response received": testCancelAfterResponseReceived, - "duing streaming response body": testCancelDuringStreamingResponseBody, + "before response received": testCancelBeforeResponseReceived, + "during streaming request body": testCancelDuringStreamingRequestBody, + "after response received": testCancelAfterResponseReceived, + "during streaming response body": testCancelDuringStreamingResponseBody, } proxyVariants := map[string]initProxy{ diff --git a/proxy/loopback_test.go b/proxy/loopback_test.go index 6712554380..9193348c26 100644 --- a/proxy/loopback_test.go +++ b/proxy/loopback_test.go @@ -87,6 +87,7 @@ func testLoopback( w.Header().Set("X-Backend-Done", "true") })) + defer backend.Close() routes = strings.ReplaceAll(routes, "$backend", backend.URL) diff --git a/proxy/main_test.go b/proxy/main_test.go new file mode 100644 index 0000000000..60248195b7 --- /dev/null +++ b/proxy/main_test.go @@ -0,0 +1,12 @@ +package proxy + +import ( + "os" + "testing" + + "github.com/AlexanderYastrebov/noleak" +) + +func TestMain(m *testing.M) { + os.Exit(noleak.CheckMain(m)) +} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 5b2b64d9c0..a386d5a6d7 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -61,6 +61,7 @@ type syncResponseWriter struct { type testProxy struct { log *loggingtest.Logger + dc *testdataclient.Client routing *routing.Routing proxy *Proxy } @@ -162,7 +163,7 @@ func newTestProxyWithFiltersAndParams(fr filters.Registry, doc string, params Pa return nil, err } - return &testProxy{tl, rt, p}, nil + return &testProxy{tl, dc, rt, p}, nil } func newTestProxyWithFilters(fr filters.Registry, doc string, flags Flags, pr ...PriorityRoute) (*testProxy, error) { @@ -183,6 +184,7 @@ func newTestProxy(doc string, flags Flags, pr ...PriorityRoute) (*testProxy, err func (tp *testProxy) close() { tp.log.Close() + tp.dc.Close() tp.routing.Close() tp.proxy.Close() } @@ -1073,7 +1075,7 @@ func TestProcessesRequestWithPriorityRouteOverStandard(t *testing.T) { s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Test-Header", "normal-value") })) - defer s0.Close() + defer s1.Close() req, err := http.NewRequest( "GET", @@ -1129,6 +1131,7 @@ func TestFlusherImplementation(t *testing.T) { a := fmt.Sprintf(":%d", 1<<16-rand.Intn(1<<15)) ps := &http.Server{Addr: a, Handler: tp.proxy} + defer ps.Close() go ps.ListenAndServe() // let the server start listening @@ -1339,6 +1342,7 @@ func TestHostHeader(t *testing.T) { closeAll() continue } + defer rsp.Body.Close() if ti.flags.Debug() { closeAll() @@ -1362,8 +1366,7 @@ func TestBackendServiceUnavailable(t *testing.T) { t.Error(err) return } - - defer p.proxy.Close() + defer p.close() ps := httptest.NewServer(p.proxy) defer ps.Close() @@ -1471,7 +1474,7 @@ func TestResponseHeaderTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - defer p.proxy.Close() + defer p.close() ps := httptest.NewServer(p.proxy) defer ps.Close() @@ -1554,8 +1557,7 @@ func TestBranding(t *testing.T) { t.Error(err) return } - - defer p.proxy.Close() + defer p.close() ps := httptest.NewServer(p.proxy) defer ps.Close() @@ -1618,8 +1620,7 @@ func TestFixNoAppLogFor404(t *testing.T) { t.Error(err) return } - - defer p.proxy.Close() + defer p.close() ps := httptest.NewServer(p.proxy) defer ps.Close() @@ -1677,14 +1678,14 @@ func TestRequestContentHeaders(t *testing.T) { return } })) + defer backend.Close() p, err := newTestProxy(fmt.Sprintf(`* -> "%s"`, backend.URL), FlagsNone) if err != nil { t.Error(err) return } - - defer p.proxy.Close() + defer p.close() ps := httptest.NewServer(p.proxy) defer ps.Close() @@ -1727,12 +1728,14 @@ func TestSettingDefaultHTTPStatus(t *testing.T) { DefaultHTTPStatus: http.StatusBadGateway, } p := WithParams(params) + p.Close() if p.defaultHTTPStatus != http.StatusBadGateway { t.Errorf("expected default HTTP status %d, got %d", http.StatusBadGateway, p.defaultHTTPStatus) } params.DefaultHTTPStatus = http.StatusNetworkAuthenticationRequired + 1 p = WithParams(params) + p.Close() if p.defaultHTTPStatus != http.StatusNotFound { t.Errorf("expected default HTTP status %d, got %d", http.StatusNotFound, p.defaultHTTPStatus) } @@ -1989,8 +1992,7 @@ func TestAccessLogOnFailedRequest(t *testing.T) { t.Error(err) return } - - defer p.proxy.Close() + defer p.close() ps := httptest.NewServer(p.proxy) defer ps.Close() diff --git a/proxy/ratelimit_test.go b/proxy/ratelimit_test.go index 9971c0e02a..f1e52a94ba 100644 --- a/proxy/ratelimit_test.go +++ b/proxy/ratelimit_test.go @@ -31,14 +31,17 @@ func TestCheckDisableRateLimit(t *testing.T) { backend := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) defer backend.Close() r := []*eskip.Route{{Backend: backend.URL}} + reg := ratelimit.NewRegistry(ratelimit.Settings{ + Type: ratelimit.DisableRatelimit, + MaxHits: 10, + TimeWindow: 1 * time.Second, + CleanInterval: 2 * time.Second, + }) + defer reg.Close() + p := proxytest.WithParams(fr, proxy.Params{ CloseIdleConnsPeriod: -time.Second, - RateLimiters: ratelimit.NewRegistry(ratelimit.Settings{ - Type: ratelimit.DisableRatelimit, - MaxHits: 10, - TimeWindow: 1 * time.Second, - CleanInterval: 2 * time.Second, - }), + RateLimiters: reg, }, r...) defer p.Close() @@ -57,9 +60,12 @@ func TestCheckLocalRateLimitForShuntRoutes(t *testing.T) { TimeWindow: timeWindow, CleanInterval: 2 * timeWindow, } + reg := ratelimit.NewRegistry(ratelimitSettings) + defer reg.Close() + p := proxytest.WithParams(fr, proxy.Params{ CloseIdleConnsPeriod: -time.Second, - RateLimiters: ratelimit.NewRegistry(ratelimitSettings), + RateLimiters: reg, }, r...) defer p.Close() @@ -86,9 +92,12 @@ func TestCheckLocalRateLimit(t *testing.T) { TimeWindow: timeWindow, CleanInterval: 2 * timeWindow, } + reg := ratelimit.NewRegistry(ratelimitSettings) + defer reg.Close() + p := proxytest.WithParams(fr, proxy.Params{ CloseIdleConnsPeriod: -time.Second, - RateLimiters: ratelimit.NewRegistry(ratelimitSettings), + RateLimiters: reg, }, r...) defer p.Close() @@ -114,9 +123,12 @@ func TestCheckServiceRateLimit(t *testing.T) { MaxHits: 3, TimeWindow: timeWindow, } + reg := ratelimit.NewRegistry(ratelimitSettings) + defer reg.Close() + p := proxytest.WithParams(fr, proxy.Params{ CloseIdleConnsPeriod: -time.Second, - RateLimiters: ratelimit.NewRegistry(ratelimitSettings), + RateLimiters: reg, }, r...) defer p.Close() @@ -143,9 +155,12 @@ func TestRetryAfterHeader(t *testing.T) { MaxHits: 1, TimeWindow: timeWindow, } + reg := ratelimit.NewRegistry(ratelimitSettings) + defer reg.Close() + p := proxytest.WithParams(fr, proxy.Params{ CloseIdleConnsPeriod: -time.Second, - RateLimiters: ratelimit.NewRegistry(ratelimitSettings), + RateLimiters: reg, }, r...) defer p.Close() diff --git a/proxy/upgrade_test.go b/proxy/upgrade_test.go index 0e60a48589..8d1838e7a0 100644 --- a/proxy/upgrade_test.go +++ b/proxy/upgrade_test.go @@ -160,10 +160,7 @@ func TestServeHTTP(t *testing.T) { } { t.Run(ti.msg, func(t *testing.T) { ti := ti // trick race detector - var clientConnClosed atomic.Value - clientAlive := func() bool { - return clientConnClosed.Load() == nil - } + var clientConnClosed atomic.Bool backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if ti.backendClosesConnection { @@ -184,12 +181,15 @@ func TestServeHTTP(t *testing.T) { conn, bufrw, err := hj.Hijack() require.NoError(t, err) + // Closing server does not close hijacked connections so do it explicitly defer conn.Close() for { s, err := bufrw.ReadString('\n') - if err != nil && clientAlive() { - t.Errorf("error reading string: %v", err) + if err != nil { + if !clientConnClosed.Load() { + t.Error(err) + } return } @@ -205,13 +205,17 @@ func TestServeHTTP(t *testing.T) { } _, err = bufrw.WriteString(resp) - if err != nil && clientAlive() { - t.Error(err) + if err != nil { + if !clientConnClosed.Load() { + t.Error(err) + } return } err = bufrw.Flush() - if err != nil && clientAlive() { - t.Error(err) + if err != nil { + if !clientConnClosed.Load() { + t.Error(err) + } return } } @@ -349,11 +353,14 @@ func TestAuditLogging(t *testing.T) { // only used as poor man's sync, the audit log in question goes stdout and stderr, // see below tl := loggingtest.New() + defer tl.Close() + + dc := testdataclient.New([]*eskip.Route{{Backend: wss.URL}}) + defer dc.Close() + rt := routing.New(routing.Options{ - DataClients: []routing.DataClient{ - testdataclient.New([]*eskip.Route{{Backend: wss.URL}}), - }, - Log: tl, + DataClients: []routing.DataClient{dc}, + Log: tl, }) defer rt.Close()