Skip to content

Commit

Permalink
feat: add cluster shards refresh interval (#638)
Browse files Browse the repository at this point in the history
* feat: add clutser topology awareness

* fix: depense closed channel close

* refactor: make conditional refresh

* test: fix broken test

* test: change flaky test

* perf: use less call

* refactor: rollback

* perf: set detection logic

* refactor: rename cluster option

* refactor: divide lazy conditional refresh

* refactor: make reusable code

* refactor: merge refresh

* refactor: use parameter

* refactor: always refresh

* refactor: rename shardsrefreshinterval

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
proost authored and rueian committed Oct 1, 2024
1 parent aba4970 commit 500529e
Show file tree
Hide file tree
Showing 3 changed files with 356 additions and 1 deletion.
27 changes: 26 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// ErrNoSlot indicates that there is no valkey node owns the key slot.
var ErrNoSlot = errors.New("the slot has no valkey node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0")

type clusterClient struct {
pslots [16384]conn
Expand All @@ -31,6 +32,7 @@ type clusterClient struct {
stop uint32
cmd Builder
retry bool
stopCh chan struct{}
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -46,6 +48,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
stopCh: make(chan struct{}),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
Expand Down Expand Up @@ -74,6 +77,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
return client, err
}

if opt.ClusterOption.ShardsRefreshInterval > 0 {
go client.runClusterTopologyRefreshment()
} else if opt.ClusterOption.ShardsRefreshInterval < 0 {
return nil, ErrInvalidShardsRefreshInterval
}

return client, nil
}

Expand Down Expand Up @@ -358,6 +367,19 @@ func parseShards(shards ValkeyMessage, defaultAddr string, tls bool) map[string]
return groups
}

func (c *clusterClient) runClusterTopologyRefreshment() {
ticker := time.NewTicker(c.opt.ClusterOption.ShardsRefreshInterval)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.lazyRefresh()
}
}
}

func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
c.mu.RLock()
if slot == cmds.InitSlot {
Expand Down Expand Up @@ -1018,7 +1040,10 @@ func (c *clusterClient) Nodes() map[string]Client {
}

func (c *clusterClient) Close() {
atomic.StoreUint32(&c.stop, 1)
if atomic.CompareAndSwapUint32(&c.stop, 0, 1) {
close(c.stopCh)
}

c.mu.RLock()
for _, cc := range c.conns {
go cc.conn.Close()
Expand Down
Loading

0 comments on commit 500529e

Please sign in to comment.