Skip to content

Commit

Permalink
fix: prevent acquiring broken connections from the pool (#711)
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Dec 29, 2024
1 parent 784282a commit 3c7b197
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type pool struct {

func (p *pool) Acquire() (v wire) {
p.cond.L.Lock()
retry:
for len(p.list) == 0 && p.size == p.cap && !p.down {
p.cond.Wait()
}
Expand All @@ -51,6 +52,11 @@ func (p *pool) Acquire() (v wire) {
v = p.list[i]
p.list[i] = nil
p.list = p.list[:i]
if v.Error() != nil {
p.size--
v.Close()
goto retry
}
}
p.cond.L.Unlock()
return v
Expand Down
28 changes: 28 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,34 @@ func TestPool(t *testing.T) {
}
})

t.Run("Reuse without broken connections", func(t *testing.T) {
pool, count := setup(100)
c1 := pool.Acquire()
c2 := pool.Acquire()
pool.Store(c1)
pool.Store(c2)
pool.cond.L.Lock()
for _, p := range pool.list {
p.Close()
}
pool.cond.L.Unlock()
c3 := pool.Acquire()
if c3.Error() != nil {
t.Fatalf("c3.Error() is not nil")
}
if atomic.LoadInt32(count) != 3 {
t.Fatalf("pool does not clean borken connections")
}
pool.cond.L.Lock()
defer pool.cond.L.Unlock()
if pool.size != 1 {
t.Fatalf("pool size is not 1")
}
if len(pool.list) != 0 {
t.Fatalf("pool list is not empty")
}
})

t.Run("NotExceed", func(t *testing.T) {
conn := make([]wire, 100)
pool, count := setup(len(conn))
Expand Down

0 comments on commit 3c7b197

Please sign in to comment.