Skip to content

Commit

Permalink
feat: make Message fields be private, move cache/proto/queue pkg to root
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Jan 4, 2022
1 parent 92007d6 commit 56bf905
Show file tree
Hide file tree
Showing 35 changed files with 1,008 additions and 1,456 deletions.
9 changes: 4 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/rueian/rueidis/internal/cmds"
"github.com/rueian/rueidis/internal/proto"
)

type singleClient struct {
Expand Down Expand Up @@ -33,13 +32,13 @@ func (c *singleClient) B() *cmds.Builder {
return c.cmd
}

func (c *singleClient) Do(ctx context.Context, cmd cmds.Completed) (resp proto.Result) {
func (c *singleClient) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
resp = c.conn.Do(cmd)
c.cmd.Put(cmd.CommandSlice())
return resp
}

func (c *singleClient) DoCache(ctx context.Context, cmd cmds.Cacheable, ttl time.Duration) (resp proto.Result) {
func (c *singleClient) DoCache(ctx context.Context, cmd cmds.Cacheable, ttl time.Duration) (resp RedisResult) {
resp = c.conn.DoCache(cmd, ttl)
c.cmd.Put(cmd.CommandSlice())
return resp
Expand All @@ -65,13 +64,13 @@ func (c *dedicatedSingleClient) B() *cmds.Builder {
return c.cmd
}

func (c *dedicatedSingleClient) Do(ctx context.Context, cmd cmds.Completed) (resp proto.Result) {
func (c *dedicatedSingleClient) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
resp = c.wire.Do(cmd)
c.cmd.Put(cmd.CommandSlice())
return resp
}

func (c *dedicatedSingleClient) DoMulti(ctx context.Context, multi ...cmds.Completed) (resp []proto.Result) {
func (c *dedicatedSingleClient) DoMulti(ctx context.Context, multi ...cmds.Completed) (resp []RedisResult) {
if len(multi) == 0 {
return nil
}
Expand Down
40 changes: 19 additions & 21 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ import (
"time"

"github.com/rueian/rueidis/internal/cmds"
"github.com/rueian/rueidis/internal/mock"
"github.com/rueian/rueidis/internal/proto"
)

type mockConn struct {
DoFn func(cmd cmds.Completed) proto.Result
DoCacheFn func(cmd cmds.Cacheable, ttl time.Duration) proto.Result
DoMultiFn func(multi ...cmds.Completed) []proto.Result
InfoFn func() map[string]proto.Message
DoFn func(cmd cmds.Completed) RedisResult
DoCacheFn func(cmd cmds.Cacheable, ttl time.Duration) RedisResult
DoMultiFn func(multi ...cmds.Completed) []RedisResult
InfoFn func() map[string]RedisMessage
ErrorFn func() error
CloseFn func()
DialFn func() error
Expand Down Expand Up @@ -46,28 +44,28 @@ func (m *mockConn) Store(w wire) {
}
}

func (m *mockConn) Do(cmd cmds.Completed) proto.Result {
func (m *mockConn) Do(cmd cmds.Completed) RedisResult {
if m.DoFn != nil {
return m.DoFn(cmd)
}
return proto.Result{}
return RedisResult{}
}

func (m *mockConn) DoCache(cmd cmds.Cacheable, ttl time.Duration) proto.Result {
func (m *mockConn) DoCache(cmd cmds.Cacheable, ttl time.Duration) RedisResult {
if m.DoCacheFn != nil {
return m.DoCacheFn(cmd, ttl)
}
return proto.Result{}
return RedisResult{}
}

func (m *mockConn) DoMulti(multi ...cmds.Completed) []proto.Result {
func (m *mockConn) DoMulti(multi ...cmds.Completed) []RedisResult {
if m.DoMultiFn != nil {
return m.DoMultiFn(multi...)
}
return nil
}

func (m *mockConn) Info() map[string]proto.Message {
func (m *mockConn) Info() map[string]RedisMessage {
if m.InfoFn != nil {
return m.InfoFn()
}
Expand Down Expand Up @@ -125,11 +123,11 @@ func TestSingleClient(t *testing.T) {

t.Run("Delegate Do", func(t *testing.T) {
c := client.B().Get().Key("Do").Build()
m.DoFn = func(cmd cmds.Completed) proto.Result {
m.DoFn = func(cmd cmds.Completed) RedisResult {
if !reflect.DeepEqual(cmd.Commands(), c.Commands()) {
t.Fatalf("unexpected command %v", cmd)
}
return proto.NewResult(proto.Message{Type: '+', String: "Do"}, nil)
return newResult(RedisMessage{typ: '+', string: "Do"}, nil)
}
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
Expand All @@ -138,11 +136,11 @@ func TestSingleClient(t *testing.T) {

t.Run("Delegate DoCache", func(t *testing.T) {
c := client.B().Get().Key("DoCache").Cache()
m.DoCacheFn = func(cmd cmds.Cacheable, ttl time.Duration) proto.Result {
m.DoCacheFn = func(cmd cmds.Cacheable, ttl time.Duration) RedisResult {
if !reflect.DeepEqual(cmd.Commands(), c.Commands()) || ttl != 100 {
t.Fatalf("unexpected command %v, %v", cmd, ttl)
}
return proto.NewResult(proto.Message{Type: '+', String: "DoCache"}, nil)
return newResult(RedisMessage{typ: '+', string: "DoCache"}, nil)
}
if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "DoCache" {
t.Fatalf("unexpected response %v %v", v, err)
Expand All @@ -168,12 +166,12 @@ func TestSingleClient(t *testing.T) {
})

t.Run("Dedicated Delegate", func(t *testing.T) {
w := &mock.Wire{
DoFn: func(cmd cmds.Completed) proto.Result {
return proto.NewResult(proto.Message{Type: '+', String: "Delegate"}, nil)
w := &mockWire{
DoFn: func(cmd cmds.Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)
},
DoMultiFn: func(cmd ...cmds.Completed) []proto.Result {
return []proto.Result{proto.NewResult(proto.Message{Type: '+', String: "Delegate"}, nil)}
DoMultiFn: func(cmd ...cmds.Completed) []RedisResult {
return []RedisResult{newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)}
},
}
m.AcquireFn = func() wire {
Expand Down
39 changes: 19 additions & 20 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/rueian/rueidis/internal/cmds"
"github.com/rueian/rueidis/internal/proto"
)

var ErrNoSlot = errors.New("the slot has no redis node")
Expand Down Expand Up @@ -84,7 +83,7 @@ func (c *clusterClient) refresh() (err error) {
}

func (c *clusterClient) _refresh() (err error) {
var reply proto.Message
var reply RedisMessage
var dead []string

retry:
Expand Down Expand Up @@ -114,7 +113,7 @@ retry:
return err
}

if len(reply.Values) == 0 {
if len(reply.values) == 0 {
if _, err = c.init(); err != nil {
return err
}
Expand Down Expand Up @@ -177,20 +176,20 @@ type group struct {
slots [][2]int64
}

func parseSlots(slots proto.Message) map[string]group {
groups := make(map[string]group, len(slots.Values))
for _, v := range slots.Values {
master := fmt.Sprintf("%s:%d", v.Values[2].Values[0].String, v.Values[2].Values[1].Integer)
func parseSlots(slots RedisMessage) map[string]group {
groups := make(map[string]group, len(slots.values))
for _, v := range slots.values {
master := fmt.Sprintf("%s:%d", v.values[2].values[0].string, v.values[2].values[1].integer)
g, ok := groups[master]
if !ok {
g.slots = make([][2]int64, 0)
g.nodes = make([]string, 0, len(v.Values)-2)
for i := 2; i < len(v.Values); i++ {
dst := fmt.Sprintf("%s:%d", v.Values[i].Values[0].String, v.Values[i].Values[1].Integer)
g.nodes = make([]string, 0, len(v.values)-2)
for i := 2; i < len(v.values); i++ {
dst := fmt.Sprintf("%s:%d", v.values[i].values[0].string, v.values[i].values[1].integer)
g.nodes = append(g.nodes, dst)
}
}
g.slots = append(g.slots, [2]int64{v.Values[0].Integer, v.Values[1].Integer})
g.slots = append(g.slots, [2]int64{v.values[0].integer, v.values[1].integer})
groups[master] = g
}
return groups
Expand Down Expand Up @@ -242,11 +241,11 @@ func (c *clusterClient) B() *cmds.Builder {
return c.cmd
}

func (c *clusterClient) Do(ctx context.Context, cmd cmds.Completed) (resp proto.Result) {
func (c *clusterClient) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
retry:
cc, err := c.pick(cmd.Slot())
if err != nil {
resp = proto.NewErrResult(err)
resp = newErrResult(err)
goto ret
}
resp = cc.Do(cmd)
Expand All @@ -269,11 +268,11 @@ ret:
return resp
}

func (c *clusterClient) DoCache(ctx context.Context, cmd cmds.Cacheable, ttl time.Duration) (resp proto.Result) {
func (c *clusterClient) DoCache(ctx context.Context, cmd cmds.Cacheable, ttl time.Duration) (resp RedisResult) {
retry:
cc, err := c.pick(cmd.Slot())
if err != nil {
resp = proto.NewErrResult(err)
resp = newErrResult(err)
goto ret
}
resp = cc.DoCache(cmd, ttl)
Expand Down Expand Up @@ -355,18 +354,18 @@ func (c *dedicatedClusterClient) B() *cmds.Builder {
return c.cmd
}

func (c *dedicatedClusterClient) Do(ctx context.Context, cmd cmds.Completed) (resp proto.Result) {
func (c *dedicatedClusterClient) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
c.check(cmd.Slot())
if err := c.acquire(); err != nil {
return proto.NewErrResult(err)
return newErrResult(err)
} else {
resp = c.wire.Do(cmd)
}
c.cmd.Put(cmd.CommandSlice())
return resp
}

func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...cmds.Completed) (resp []proto.Result) {
func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...cmds.Completed) (resp []RedisResult) {
if len(multi) == 0 {
return nil
}
Expand All @@ -376,9 +375,9 @@ func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...cmds.Comp
if err := c.acquire(); err == nil {
resp = c.wire.DoMulti(multi...)
} else {
resp = make([]proto.Result, len(multi))
resp = make([]RedisResult, len(multi))
for i := range resp {
resp[i] = proto.NewErrResult(err)
resp[i] = newErrResult(err)
}
}
for _, cmd := range multi {
Expand Down
Loading

0 comments on commit 56bf905

Please sign in to comment.