Skip to content

Commit

Permalink
feat: improve handling of LOADING response (#657)
Browse files Browse the repository at this point in the history
* feat: improve handling of LOADING errors

These changes make the client more resilient during Redis node restarts and initial
data loading phases by properly handling LOADING errors that occur when a Redis
node is loading data from persistence.

Signed-off-by: Ernesto Alejandro Santana Hidalgo <[email protected]>

* perf: skip retry check on Nil response

---------

Signed-off-by: Ernesto Alejandro Santana Hidalgo <[email protected]>
Co-authored-by: Rueian <[email protected]>
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
nesty92 and rueian committed Nov 8, 2024
1 parent 5d0bf0e commit 016fdfe
Show file tree
Hide file tree
Showing 7 changed files with 582 additions and 15 deletions.
28 changes: 20 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *singleClient) Do(ctx context.Context, cmd Completed) (resp ValkeyResult
attempts := 1
retry:
resp = c.conn.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonValkeyError(), ctx) {
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand Down Expand Up @@ -88,7 +88,7 @@ retry:
resps = c.conn.DoMulti(ctx, multi...).s
if c.retry && allReadOnly(multi) {
for i, resp := range resps {
if c.isRetryable(resp.NonValkeyError(), ctx) {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, multi[i], resp.Error(),
)
Expand Down Expand Up @@ -116,7 +116,7 @@ retry:
resps = c.conn.DoMultiCache(ctx, multi...).s
if c.retry {
for i, resp := range resps {
if c.isRetryable(resp.NonValkeyError(), ctx) {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, Completed(multi[i].Cmd), resp.Error(),
)
Expand All @@ -139,7 +139,7 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura
attempts := 1
retry:
resp = c.conn.DoCache(ctx, cmd, ttl)
if c.retry && c.isRetryable(resp.NonValkeyError(), ctx) {
if c.retry && c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
if shouldRetry {
attempts++
Expand Down Expand Up @@ -214,7 +214,7 @@ retry:
return newErrResult(err)
}
resp = c.wire.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonValkeyError(), c.wire, ctx) {
if c.retry && cmd.IsReadOnly() && isRetryable(resp.Error(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand Down Expand Up @@ -244,7 +244,7 @@ retry:
}
resp = c.wire.DoMulti(ctx, multi...).s
for i, cmd := range multi {
if retryable && isRetryable(resp[i].NonValkeyError(), c.wire, ctx) {
if retryable && isRetryable(resp[i].Error(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, multi[i], resp[i].Error(),
)
Expand Down Expand Up @@ -312,11 +312,23 @@ func (c *dedicatedSingleClient) release() {
}

func (c *singleClient) isRetryable(err error, ctx context.Context) bool {
return err != nil && err != ErrDoCacheAborted && atomic.LoadUint32(&c.stop) == 0 && ctx.Err() == nil
if err == nil || err == Nil || err == ErrDoCacheAborted || atomic.LoadUint32(&c.stop) != 0 || ctx.Err() != nil {
return false
}
if err, ok := err.(*ValkeyError); ok {
return err.IsLoading()
}
return true
}

func isRetryable(err error, w wire, ctx context.Context) bool {
return err != nil && w.Error() == nil && ctx.Err() == nil
if err == nil || err == Nil || w.Error() != nil || ctx.Err() != nil {
return false
}
if err, ok := err.(*ValkeyError); ok {
return err.IsLoading()
}
return true
}

func allReadOnly(multi []Completed) bool {
Expand Down
165 changes: 165 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,171 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
})
}

func TestSingleClientLoadingRetry(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())

setup := func() (*singleClient, *mockConn) {
m := &mockConn{}
client, err := newSingleClient(
&ClientOption{InitAddress: []string{""}},
m,
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
return client, m
}

t.Run("Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) ValkeyResult {
attempts++
if attempts == 1 {
return newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)
}
return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)
}

if v, err := client.Do(context.Background(), client.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got %v", attempts)
}
})

t.Run("Do not retry on non-loading errors", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) ValkeyResult {
attempts++
if attempts == 1 {
return newResult(ValkeyMessage{typ: '-', string: "ERR some other error"}, nil)
}
return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)
}

if err := client.Do(context.Background(), client.B().Get().Key("test").Build()).Error(); err == nil {
t.Fatal("expected error but got nil")
}
if attempts != 1 {
t.Fatalf("unexpected attempts %v, expected no retry", attempts)
}
})

t.Run("DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *valkeyresults {
attempts++
if attempts == 1 {
return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)}}
}
return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Get().Key("test").Build()
resps := client.DoMulti(context.Background(), cmd)
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("DoCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) ValkeyResult {
attempts++
if attempts == 1 {
return newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)
}
return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)
}

cmd := client.B().Get().Key("test").Cache()
if v, err := client.DoCache(context.Background(), cmd, time.Minute).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("DoMultiCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiCacheFn = func(multi ...CacheableTTL) *valkeyresults {
attempts++
if attempts == 1 {
return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)}}
}
return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Get().Key("test").Cache()
resps := client.DoMultiCache(context.Background(), CT(cmd, time.Minute))
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("Dedicated Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) ValkeyResult {
attempts++
if attempts == 1 {
return newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)
}
return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)
}
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }

err := client.Dedicated(func(c DedicatedClient) error {
if v, err := c.Do(context.Background(), c.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})

t.Run("Dedicated DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *valkeyresults {
attempts++
if attempts == 1 {
return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)}}
}
return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}}
}
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }

err := client.Dedicated(func(c DedicatedClient) error {
resps := c.DoMulti(context.Background(), c.B().Get().Key("test").Build())
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})
}

func BenchmarkSingleClient_DoCache(b *testing.B) {
ctx := context.Background()
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}, Dialer: net.Dialer{KeepAlive: -1}})
Expand Down
4 changes: 2 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,13 +1126,13 @@ func (c *clusterClient) Close() {
}

func (c *clusterClient) shouldRefreshRetry(err error, ctx context.Context) (addr string, mode RedirectMode) {
if err != nil && atomic.LoadUint32(&c.stop) == 0 {
if err != nil && err != Nil && err != ErrDoCacheAborted && atomic.LoadUint32(&c.stop) == 0 {
if err, ok := err.(*ValkeyError); ok {
if addr, ok = err.IsMoved(); ok {
mode = RedirectMove
} else if addr, ok = err.IsAsk(); ok {
mode = RedirectAsk
} else if err.IsClusterDown() || err.IsTryAgain() {
} else if err.IsClusterDown() || err.IsTryAgain() || err.IsLoading() {
mode = RedirectRetry
}
} else if ctx.Err() == nil {
Expand Down
Loading

0 comments on commit 016fdfe

Please sign in to comment.