diff --git a/pipe.go b/pipe.go index 92bbc5d..07f0ffb 100644 --- a/pipe.go +++ b/pipe.go @@ -28,8 +28,9 @@ var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?") // See https://github.com/redis/rueidis/pull/691 func isUnsubReply(msg *ValkeyMessage) bool { // ex. NOPERM User limiteduser has no permissions to run the 'ping' command - // ex. LOADING Redis is loading the dataset in memory - if msg.typ == '-' && (strings.HasPrefix(msg.string, "LOADING") || strings.Contains(msg.string, "'ping'")) { + // ex. LOADING server is loading the dataset in memory + // ex. BUSY + if msg.typ == '-' && (strings.HasPrefix(msg.string, "LOADING") || strings.HasPrefix(msg.string, "BUSY") || strings.Contains(msg.string, "'ping'")) { msg.typ = '+' msg.string = "PONG" return true diff --git a/pipe_test.go b/pipe_test.go index 043f27a..c80133a 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -3337,11 +3337,98 @@ func TestPubSub(t *testing.T) { builder.Get().Key("mk").Build(), } - replies := [][]RedisMessage{ + replies := [][]ValkeyMessage{ + { + { // proactive unsubscribe before user unsubscribe + typ: '>', + values: []ValkeyMessage{ + {typ: '+', string: "sunsubscribe"}, + {typ: '+', string: "a"}, + {typ: ':', integer: 0}, + }, + }, + { // proactive unsubscribe before user unsubscribe + typ: '>', + values: []ValkeyMessage{ + {typ: '+', string: "sunsubscribe"}, + {typ: '+', string: "b"}, + {typ: ':', integer: 0}, + }, + }, + }, { + // empty + }, { + { // proactive unsubscribe after user unsubscribe + typ: '>', + values: []ValkeyMessage{ + {typ: '+', string: "sunsubscribe"}, + {typ: '_'}, + {typ: ':', integer: 0}, + }, + }, + {typ: '+', string: "mk"}, + }, + } + + p.background() + + // proactive unsubscribe before other commands + mock.Expect().Reply(ValkeyMessage{ // proactive unsubscribe before user unsubscribe + typ: '>', + values: []ValkeyMessage{ + {typ: '+', string: "sunsubscribe"}, + {typ: '+', string: "0"}, + {typ: ':', integer: 0}, + }, + }) + + time.Sleep(time.Millisecond * 100) + + for i, cmd1 := range commands { + cmd2 := builder.Get().Key(strconv.Itoa(i)).Build() + go func() { + if cmd1.IsUnsub() { + mock.Expect(cmd1.Commands()...).Expect(cmds.PingCmd.Commands()...). + Reply(replies[i]...). + Reply(ValkeyMessage{ // failed unsubReply + typ: '-', + string: "LOADING server is loading the dataset in memory", + }).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i)) + } else { + mock.Expect(cmd1.Commands()...).Reply(replies[i]...).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i)) + } + }() + if i == 2 { + if v, err := p.Do(ctx, cmd1).ToString(); err != nil || v != "mk" { + t.Fatalf("unexpected err %v", err) + } + } else { + if err := p.Do(ctx, cmd1).Error(); err != nil { + t.Fatalf("unexpected err %v", err) + } + } + if v, err := p.Do(ctx, cmd2).ToString(); err != nil || v != strconv.Itoa(i) { + t.Fatalf("unexpected val %v %v", v, err) + } + } + cancel() + }) + + t.Run("PubSub unsubReply failed because of error BUSY from server", func(t *testing.T) { + ctx := context.Background() + p, mock, cancel, _ := setup(t, ClientOption{}) + + commands := []Completed{ + builder.Sunsubscribe().Channel("1").Build(), + builder.Sunsubscribe().Channel("2").Build(), + builder.Get().Key("mk").Build(), + } + + replies := [][]ValkeyMessage{ { { // proactive unsubscribe before user unsubscribe typ: '>', - values: []RedisMessage{ + values: []ValkeyMessage{ {typ: '+', string: "sunsubscribe"}, {typ: '+', string: "a"}, {typ: ':', integer: 0}, @@ -3349,7 +3436,7 @@ func TestPubSub(t *testing.T) { }, { // proactive unsubscribe before user unsubscribe typ: '>', - values: []RedisMessage{ + values: []ValkeyMessage{ {typ: '+', string: "sunsubscribe"}, {typ: '+', string: "b"}, {typ: ':', integer: 0}, @@ -3360,7 +3447,7 @@ func TestPubSub(t *testing.T) { }, { { // proactive unsubscribe after user unsubscribe typ: '>', - values: []RedisMessage{ + values: []ValkeyMessage{ {typ: '+', string: "sunsubscribe"}, {typ: '_'}, {typ: ':', integer: 0}, @@ -3373,9 +3460,9 @@ func TestPubSub(t *testing.T) { p.background() // proactive unsubscribe before other commands - mock.Expect().Reply(RedisMessage{ // proactive unsubscribe before user unsubscribe + mock.Expect().Reply(ValkeyMessage{ // proactive unsubscribe before user unsubscribe typ: '>', - values: []RedisMessage{ + values: []ValkeyMessage{ {typ: '+', string: "sunsubscribe"}, {typ: '+', string: "0"}, {typ: ':', integer: 0}, @@ -3390,9 +3477,9 @@ func TestPubSub(t *testing.T) { if cmd1.IsUnsub() { mock.Expect(cmd1.Commands()...).Expect(cmds.PingCmd.Commands()...). Reply(replies[i]...). - Reply(RedisMessage{ // failed unsubReply + Reply(ValkeyMessage{ // failed unsubReply typ: '-', - string: "LOADING Redis is loading the dataset in memory", + string: "BUSY", }).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i)) } else { mock.Expect(cmd1.Commands()...).Reply(replies[i]...).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i))