Skip to content

Commit

Permalink
refactor: shorten naming, simplify conditional branches in clusterclient
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Nov 21, 2023
1 parent cb76d15 commit f81f163
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 82 deletions.
116 changes: 40 additions & 76 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// ErrNoSlot indicates that there is no redis node owns the key slot.
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflictsWithSendToReplicas = errors.New("ReplicaOnly conflicts with SendToReplicas option")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")

type retry struct {
cIndexes []int
Expand Down Expand Up @@ -101,7 +101,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient,
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
return nil, ErrReplicaOnlyConflictsWithSendToReplicas
return nil, ErrReplicaOnlyConflict
}

client.connFn = func(dst string, opt *ClientOption) conn {
Expand Down Expand Up @@ -222,18 +222,10 @@ func (c *clusterClient) _refresh() (err error) {

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
for _, g := range groups {
for i, addr := range g.nodes {
if i == 0 {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
}
} else {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
replica: true,
}
}
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
}
}
// make sure InitAddress always be present
Expand All @@ -249,10 +241,10 @@ func (c *clusterClient) _refresh() (err error) {

c.mu.RLock()
for addr, cc := range c.conns {
if newConn, ok := conns[addr]; ok {
if fresh, ok := conns[addr]; ok {
conns[addr] = connrole{
conn: cc.conn,
replica: newConn.replica,
replica: fresh.replica,
}
} else {
removes = append(removes, cc.conn)
Expand All @@ -275,7 +267,6 @@ func (c *clusterClient) _refresh() (err error) {
if len(rslots) == 0 { // lazy init
rslots = make([]conn, 16384)
}

if len(g.nodes) > 1 {
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
Expand Down Expand Up @@ -410,44 +401,31 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
return groups
}

func (c *clusterClient) _pick(slot uint16, isSendToReplicas bool) (p conn) {
func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
c.mu.RLock()
if c.opt.SendToReplicas == nil {
if slot == cmds.InitSlot {
for _, cc := range c.conns {
p = cc.conn
break
if slot == cmds.InitSlot {
for _, cc := range c.conns {
if cc.replica {
continue
}
} else {
p = c.pslots[slot]
p = cc.conn
break
}
} else if toReplica && c.rslots != nil {
p = c.rslots[slot]
} else {
switch {
case slot == cmds.InitSlot:
for _, cc := range c.conns {
if cc.replica {
continue
}

p = cc.conn
break
}
case isSendToReplicas:
p = c.rslots[slot]
default:
p = c.pslots[slot]
}
p = c.pslots[slot]
}
c.mu.RUnlock()
return p
}

func (c *clusterClient) pick(ctx context.Context, slot uint16, isSendToReplicas bool) (p conn, err error) {
if p = c._pick(slot, isSendToReplicas); p == nil {
func (c *clusterClient) pick(ctx context.Context, slot uint16, toReplica bool) (p conn, err error) {
if p = c._pick(slot, toReplica); p == nil {
if err := c.refresh(ctx); err != nil {
return nil, err
}
if p = c._pick(slot, isSendToReplicas); p == nil {
if p = c._pick(slot, toReplica); p == nil {
return nil, ErrNoSlot
}
}
Expand All @@ -465,10 +443,7 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn) (p conn) {

if cc = c.conns[addr]; cc.conn == nil {
p = c.connFn(addr, c.opt)
// how to know if the new connection is replica node connection?
c.conns[addr] = connrole{
conn: p,
}
c.conns[addr] = connrole{conn: p, replica: false}
} else if prev == cc.conn {
// try reconnection if the MOVED redirects to the same host,
// because the same hostname may actually be resolved into another destination
Expand All @@ -478,10 +453,7 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn) (p conn) {
prev.Close()
}(prev)
p = c.connFn(addr, c.opt)
c.conns[addr] = connrole{
conn: p,
replica: cc.replica,
}
c.conns[addr] = connrole{conn: p, replica: cc.replica}
}
c.mu.Unlock()
return p
Expand All @@ -500,7 +472,7 @@ func (c *clusterClient) Do(ctx context.Context, cmd Completed) (resp RedisResult

func (c *clusterClient) do(ctx context.Context, cmd Completed) (resp RedisResult) {
retry:
cc, err := c.pick(ctx, cmd.Slot(), c.isSendToReplicas(cmd))
cc, err := c.pick(ctx, cmd.Slot(), c.toReplica(cmd))
if err != nil {
return newErrResult(err)
}
Expand All @@ -524,14 +496,14 @@ process:
return resp
}

func (c *clusterClient) isSendToReplicas(cmd Completed) bool {
func (c *clusterClient) toReplica(cmd Completed) bool {
if c.opt.SendToReplicas != nil {
return c.opt.SendToReplicas(cmd)
}
return false
}

func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last uint16, isSendToReplicas bool) {
func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last uint16, toReplica bool) {
last = cmds.InitSlot
init := false

Expand All @@ -547,7 +519,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last

count := conncountp.Get(len(c.conns), len(c.conns))

if !init && c.opt.SendToReplicas != nil {
if !init && c.rslots != nil && c.opt.SendToReplicas != nil {
for _, cmd := range multi {
var p conn
if c.opt.SendToReplicas(cmd) {
Expand All @@ -558,7 +530,6 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last
if p == nil {
return nil, 0, false
}

count.m[p]++
}

Expand All @@ -568,15 +539,12 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last
}
conncountp.Put(count)

replicaCommandCount := 0
for i, cmd := range multi {
if i == 0 {
last = cmd.Slot()
}
last = cmd.Slot()

var cc conn
if c.opt.SendToReplicas(cmd) {
replicaCommandCount++
toReplica = true
cc = c.rslots[cmd.Slot()]
} else {
cc = c.pslots[cmd.Slot()]
Expand All @@ -586,8 +554,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
}

return retries, last, replicaCommandCount == len(multi)
return retries, last, toReplica
}

for _, cmd := range multi {
Expand Down Expand Up @@ -620,21 +587,20 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last
re.cIndexes = append(re.cIndexes, i)
}
}

return retries, last, false
}

func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, uint16, bool, error) {
conns, slot, isSendToReplicas := c._pickMulti(multi)
conns, slot, toReplica := c._pickMulti(multi)
if conns == nil {
if err := c.refresh(ctx); err != nil {
return nil, 0, false, err
}
if conns, slot, isSendToReplicas = c._pickMulti(multi); conns == nil {
if conns, slot, toReplica = c._pickMulti(multi); conns == nil {
return nil, 0, false, ErrNoSlot
}
}
return conns, slot, isSendToReplicas, nil
return conns, slot, toReplica, nil
}

func (c *clusterClient) doresultfn(ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult) {
Expand Down Expand Up @@ -692,7 +658,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
return nil
}

retries, slot, isSendToReplicas, err := c.pickMulti(ctx, multi)
retries, slot, toReplica, err := c.pickMulti(ctx, multi)
if err != nil {
return fillErrs(len(multi), err)
}
Expand All @@ -702,7 +668,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
for _, re := range retries.m {
retryp.Put(re)
}
return c.doMulti(ctx, slot, multi, isSendToReplicas)
return c.doMulti(ctx, slot, multi, toReplica)
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -740,9 +706,9 @@ func fillErrs(n int, err error) (results []RedisResult) {
return results
}

func (c *clusterClient) doMulti(ctx context.Context, slot uint16, multi []Completed, isSendToReplicas bool) []RedisResult {
func (c *clusterClient) doMulti(ctx context.Context, slot uint16, multi []Completed, toReplica bool) []RedisResult {
retry:
cc, err := c.pick(ctx, slot, isSendToReplicas)
cc, err := c.pick(ctx, slot, toReplica)
if err != nil {
return fillErrs(len(multi), err)
}
Expand Down Expand Up @@ -775,7 +741,7 @@ process:

func (c *clusterClient) doCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp RedisResult) {
retry:
cc, err := c.pick(ctx, cmd.Slot(), c.isSendToReplicas(Completed(cmd)))
cc, err := c.pick(ctx, cmd.Slot(), c.toReplica(Completed(cmd)))
if err != nil {
return newErrResult(err)
}
Expand Down Expand Up @@ -845,7 +811,7 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
defer c.mu.RUnlock()

count := conncountp.Get(len(c.conns), len(c.conns))
if c.opt.SendToReplicas == nil {
if c.opt.SendToReplicas == nil || c.rslots == nil {
for _, cmd := range multi {
p := c.pslots[cmd.Cmd.Slot()]
if p == nil {
Expand Down Expand Up @@ -876,7 +842,6 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
} else {
p = c.pslots[cmd.Cmd.Slot()]
}

if p == nil {
return nil
}
Expand All @@ -896,7 +861,6 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
} else {
cc = c.pslots[cmd.Cmd.Slot()]
}

re := retries.m[cc]
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
Expand Down Expand Up @@ -1009,7 +973,7 @@ retry:

func (c *clusterClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) {
retry:
cc, err := c.pick(ctx, subscribe.Slot(), c.isSendToReplicas(subscribe))
cc, err := c.pick(ctx, subscribe.Slot(), c.toReplica(subscribe))
if err != nil {
goto ret
}
Expand Down
10 changes: 5 additions & 5 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type ClientOption struct {
// Note that this function must be fast, otherwise other redis messages will be blocked.
OnInvalidations func([]RedisMessage)

// SendToReplicas is a function that returns true if the command should be sent to replicas.
// currently only used for cluster client.
// NOTE: This function can't be used with ReplicaOnly option.
SendToReplicas func(cmd Completed) bool

// Sentinel options, including MasterSet and Auth options
Sentinel SentinelOption

Expand Down Expand Up @@ -162,11 +167,6 @@ type ClientOption struct {
// the current connection will be excluded from the client eviction process
// even if we're above the configured client eviction threshold.
ClientNoEvict bool

// SendToReplicas is a function that returns true if the command should be sent to replicas.
// currently only used for cluster client.
// NOTE: This function can't be used with ReplicaOnly option.
SendToReplicas func(cmd Completed) bool
}

// SentinelOption contains MasterSet,
Expand Down
2 changes: 1 addition & 1 deletion rueidis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestNewClusterClientError(t *testing.T) {
t.Errorf("unexpected return %v %v", client, err)
}

if !strings.Contains(err.Error(), ErrReplicaOnlyConflictsWithSendToReplicas.Error()) {
if !strings.Contains(err.Error(), ErrReplicaOnlyConflict.Error()) {
t.Errorf("unexpected error %v", err)
}
})
Expand Down

0 comments on commit f81f163

Please sign in to comment.