From 57d6b9a6640b78ba1c666c229c1f0cd6a90f71f9 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Fri, 4 Oct 2024 17:37:08 +0200 Subject: [PATCH] do not stop reconnect loop if factory returns error --- pool.go | 2 +- pool_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++ server/server_test.go | 2 +- 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index d127c9a1..f0b844da 100644 --- a/pool.go +++ b/pool.go @@ -218,7 +218,7 @@ func (p *Pool) recreateConnection(closedConn *Connection) { conn, err := p.Factory(closedConn.addr) if err != nil { p.handleError(fmt.Errorf("failed to re-create connection for %s: %w", closedConn.addr, err)) - return + continue } // When connection is closed, remove it from the pool of connections and start diff --git a/pool_test.go b/pool_test.go index 11093d4c..c42de13b 100644 --- a/pool_test.go +++ b/pool_test.go @@ -44,8 +44,19 @@ func TestPool(t *testing.T) { } }() + var ( + shouldFactoryFail atomic.Bool + factoryCalled atomic.Int32 + ) + // And a factory method that will build connection for the pool factory := func(addr string) (*connection.Connection, error) { + // increment factoryCalled counter + factoryCalled.Add(1) + + if shouldFactoryFail.Load() { + return nil, fmt.Errorf("factory error") + } // all our addresses have same configs, but if you need to use // different TLS config, you can define config out of this // function like: @@ -151,6 +162,45 @@ func TestPool(t *testing.T) { }, 2000*time.Millisecond, 50*time.Millisecond, "expect to have one less connection") }) + t.Run("connection factory returning error should not break the reconnect loop", func(t *testing.T) { + connectionsCntBeforeServerShutdown := len(pool.Connections()) + + // when we shutdown one of the servers + servers[0].Close() + + // then we have one less connection + require.Eventually(t, func() bool { + return len(pool.Connections()) == connectionsCntBeforeServerShutdown-1 + }, 500*time.Millisecond, 50*time.Millisecond, "expect to have one less connection") + + // let factory return error + shouldFactoryFail.Store(true) + + // reset factoryCalled counter + factoryCalled.Store(0) + + // wait for the reconnect loop to call factory at least once + require.Eventually(t, func() bool { + return factoryCalled.Load() > 0 + }, 1000*time.Millisecond, 50*time.Millisecond, "expect factory to be called at least once") + + // stop returning error for factory + shouldFactoryFail.Store(false) + + // when we start server again + server, err := NewTestServerWithAddr(servers[0].Addr) + require.NoError(t, err) + + // so we will not forget to close server on exit + servers[0] = server + + // then we have one more connection (the same as it was before + // we shut down the server) + require.Eventually(t, func() bool { + return len(pool.Connections()) == connectionsCntBeforeServerShutdown + }, 2000*time.Millisecond, 50*time.Millisecond, "expect to have one less connection") + }) + t.Run("when MaxReconnectWait is set reconnect wait time will exponentially increase", func(t *testing.T) { // Context: MaxReconnectWait is set to 400ms and initial // ReconnectWait is 100ms. The server is offline, so the pool diff --git a/server/server_test.go b/server/server_test.go index dbcf8dce..23d4a684 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -101,7 +101,7 @@ func TestServer_WithConnectionFactory(t *testing.T) { require.Eventually(t, func() bool { return isCalled.Load() == true - }, 100*time.Millisecond, 10*time.Millisecond) + }, 200*time.Millisecond, 10*time.Millisecond) require.ErrorIs(t, gotErr, expectedErr) })