From e0e57fd63c9a5e3b21920d55bf94182772822b1b Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Mon, 27 Nov 2023 14:45:56 +0400 Subject: [PATCH 01/12] (WIP - consul like approach) add server_lookup to make dkron independent from server node IP on raft layer --- dkron/agent.go | 26 ++++++++++++--- dkron/serf.go | 3 +- dkron/server_lookup.go | 75 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 dkron/server_lookup.go diff --git a/dkron/agent.go b/dkron/agent.go index a261f397c..4a05b2aea 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -113,9 +113,10 @@ type Agent struct { // peers is used to track the known Dkron servers. This is // used for region forwarding and clustering. - peers map[string][]*ServerParts - localPeers map[raft.ServerAddress]*ServerParts - peerLock sync.RWMutex + peers map[string][]*ServerParts + localPeers map[raft.ServerAddress]*ServerParts + peerLock sync.RWMutex + serverLookup *ServerLookup activeExecutions sync.Map @@ -316,7 +317,19 @@ func (a *Agent) setupRaft() error { logger = a.logger.Logger.Writer() } - transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger) + var serverAddressProvider raft.ServerAddressProvider = a.serverLookup + + transConfig := &raft.NetworkTransportConfig{ + Stream: a.raftLayer, + MaxPool: 3, + Timeout: 10 * time.Second, + ServerAddressProvider: serverAddressProvider, + } + transport := raft.NewNetworkTransportWithConfig(transConfig) + rpcIP := net.ParseIP(a.config.Tags["rpc_addr"]) + port, err := strconv.Atoi(a.config.Tags["port"]) + rpcAddr := &net.TCPAddr{IP: rpcIP, Port: port} + a.serverLookup.AddServer(&ServerParts{ID: a.config.NodeName, RPCAddr: rpcAddr}) a.raftTransport = transport config := raft.DefaultConfig() @@ -540,6 +553,7 @@ func (a *Agent) SetConfig(c *Config) { // StartServer launch a new dkron server process func (a *Agent) StartServer() { + a.serverLookup = NewServerLookup() if a.Store == nil { s, err := NewStore(a.logger) if err != nil { @@ -710,7 +724,9 @@ func (a *Agent) eventLoop() { a.localMemberEvent(me) case serf.EventMemberReap: a.localMemberEvent(me) - case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore + case serf.EventMemberUpdate: + a.logger.WithField("event", e.String()).Info("agent: event member update") + case serf.EventUser, serf.EventQuery: // Ignore default: a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event") } diff --git a/dkron/serf.go b/dkron/serf.go index 6311aee02..2e18f174c 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -22,7 +22,7 @@ func (a *Agent) nodeJoin(me serf.MemberEvent) { continue } a.logger.WithField("server", parts.Name).Info("adding server") - + a.serverLookup.AddServer(parts) // Check if this server is known found := false a.peerLock.Lock() @@ -174,6 +174,7 @@ func (a *Agent) nodeFailed(me serf.MemberEvent) { delete(a.localPeers, raft.ServerAddress(parts.Addr.String())) } a.peerLock.Unlock() + a.serverLookup.RemoveServer(parts) } } diff --git a/dkron/server_lookup.go b/dkron/server_lookup.go new file mode 100644 index 000000000..1ac7c31e5 --- /dev/null +++ b/dkron/server_lookup.go @@ -0,0 +1,75 @@ +package dkron + +import ( + "fmt" + "sync" + + "github.com/hashicorp/raft" +) + +// ServerLookup encapsulates looking up servers by id and address +type ServerLookup struct { + lock sync.RWMutex + addressToServer map[raft.ServerAddress]*ServerParts + idToServer map[raft.ServerID]*ServerParts +} + +func NewServerLookup() *ServerLookup { + return &ServerLookup{ + addressToServer: make(map[raft.ServerAddress]*ServerParts), + idToServer: make(map[raft.ServerID]*ServerParts), + } +} + +func (sl *ServerLookup) AddServer(server *ServerParts) { + sl.lock.Lock() + defer sl.lock.Unlock() + sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server + sl.idToServer[raft.ServerID(server.ID)] = server +} + +func (sl *ServerLookup) RemoveServer(server *ServerParts) { + sl.lock.Lock() + defer sl.lock.Unlock() + delete(sl.addressToServer, raft.ServerAddress(server.RPCAddr.String())) + delete(sl.idToServer, raft.ServerID(server.ID)) +} + +// Implements the ServerAddressProvider interface +func (sl *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) { + sl.lock.RLock() + defer sl.lock.RUnlock() + svr, ok := sl.idToServer[id] + if !ok { + return "", fmt.Errorf("Could not find address for server id %v", id) + } + return raft.ServerAddress(svr.RPCAddr.String()), nil +} + +// Server looks up the server by address, returns a boolean if not found +func (sl *ServerLookup) Server(addr raft.ServerAddress) *ServerParts { + sl.lock.RLock() + defer sl.lock.RUnlock() + return sl.addressToServer[addr] +} + +func (sl *ServerLookup) Servers() []*ServerParts { + sl.lock.RLock() + defer sl.lock.RUnlock() + var ret []*ServerParts + for _, svr := range sl.addressToServer { + ret = append(ret, svr) + } + return ret +} + +func (sl *ServerLookup) CheckServers(fn func(srv *ServerParts) bool) { + sl.lock.RLock() + defer sl.lock.RUnlock() + + for _, srv := range sl.addressToServer { + if !fn(srv) { + return + } + } +} From b35eb6be1053072ac3c1a926e212c90d0cca5dcb Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Fri, 15 Dec 2023 22:42:26 +0400 Subject: [PATCH 02/12] minor but necessary fixes --- dkron/agent.go | 8 ++++---- dkron/server_lookup.go | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 4a05b2aea..2ce71fa7c 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -143,8 +143,9 @@ type AgentOption func(agent *Agent) // and running a Dkron instance. func NewAgent(config *Config, options ...AgentOption) *Agent { agent := &Agent{ - config: config, - retryJoinCh: make(chan error), + config: config, + retryJoinCh: make(chan error), + serverLookup: NewServerLookup(), } for _, option := range options { @@ -553,7 +554,6 @@ func (a *Agent) SetConfig(c *Config) { // StartServer launch a new dkron server process func (a *Agent) StartServer() { - a.serverLookup = NewServerLookup() if a.Store == nil { s, err := NewStore(a.logger) if err != nil { @@ -725,7 +725,7 @@ func (a *Agent) eventLoop() { case serf.EventMemberReap: a.localMemberEvent(me) case serf.EventMemberUpdate: - a.logger.WithField("event", e.String()).Info("agent: event member update") + a.localMemberEvent(me) case serf.EventUser, serf.EventQuery: // Ignore default: a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event") diff --git a/dkron/server_lookup.go b/dkron/server_lookup.go index 1ac7c31e5..2343cc433 100644 --- a/dkron/server_lookup.go +++ b/dkron/server_lookup.go @@ -16,6 +16,7 @@ type ServerLookup struct { func NewServerLookup() *ServerLookup { return &ServerLookup{ + lock: sync.RWMutex{}, addressToServer: make(map[raft.ServerAddress]*ServerParts), idToServer: make(map[raft.ServerID]*ServerParts), } From 7c39d27d16383edf5df6c42dcade253a28f4a6da Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Mon, 18 Dec 2023 16:50:45 +0400 Subject: [PATCH 03/12] handle memberupdate event --- dkron/agent.go | 1 + dkron/serf.go | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/dkron/agent.go b/dkron/agent.go index 2ce71fa7c..d14de14fc 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -725,6 +725,7 @@ func (a *Agent) eventLoop() { case serf.EventMemberReap: a.localMemberEvent(me) case serf.EventMemberUpdate: + a.lanNodeUpdate(me) a.localMemberEvent(me) case serf.EventUser, serf.EventQuery: // Ignore default: diff --git a/dkron/serf.go b/dkron/serf.go index 2e18f174c..980139dc8 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -21,7 +21,7 @@ func (a *Agent) nodeJoin(me serf.MemberEvent) { a.logger.WithField("member", m.Name).Warn("non-server in gossip pool") continue } - a.logger.WithField("server", parts.Name).Info("adding server") + a.logger.WithField("server", parts.Name).Info("Adding LAN adding server") a.serverLookup.AddServer(parts) // Check if this server is known found := false @@ -201,3 +201,16 @@ func (a *Agent) localMemberEvent(me serf.MemberEvent) { } } } + +func (a *Agent) lanNodeUpdate(me serf.MemberEvent) { + for _, m := range me.Members { + ok, parts := isServer(m) + if !ok { + continue + } + a.logger.Info("Updating LAN server", "server", parts.String()) + + // Update server lookup + a.serverLookup.AddServer(parts) + } +} From 00c861724b86b011e2a4d4951f2b182fe456f2e0 Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Wed, 20 Dec 2023 14:25:04 +0400 Subject: [PATCH 04/12] query other servers before start & add test --- dkron/agent.go | 4 -- dkron/agent_test.go | 94 +++++++++++++++++++++++++++++++++++++++++- dkron/serf.go | 45 +++++++++++++++++++- dkron/server_lookup.go | 2 +- 4 files changed, 138 insertions(+), 7 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index d14de14fc..e51787643 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -327,10 +327,6 @@ func (a *Agent) setupRaft() error { ServerAddressProvider: serverAddressProvider, } transport := raft.NewNetworkTransportWithConfig(transConfig) - rpcIP := net.ParseIP(a.config.Tags["rpc_addr"]) - port, err := strconv.Atoi(a.config.Tags["port"]) - rpcAddr := &net.TCPAddr{IP: rpcIP, Port: port} - a.serverLookup.AddServer(&ServerParts{ID: a.config.NodeName, RPCAddr: rpcAddr}) a.raftTransport = transport config := raft.DefaultConfig() diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 37ddc4ba1..a9a33b0d0 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -16,7 +16,7 @@ import ( ) var ( - logLevel = "error" + logLevel = "info" ) func TestAgentCommand_runForElection(t *testing.T) { @@ -636,3 +636,95 @@ func Test_selectNodes(t *testing.T) { }) } } + +func Test_clusterWillRecoverAfterIpChange(t *testing.T) { + a1, rfn1 := buildAndRunAgent("test8", []string{}, 3) + defer rfn1() + a2, rfn2 := buildAndRunAgent("test9", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946"}, 3) + defer rfn2() + a3, rfn3 := buildAndRunAgent("test10", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946", a2.bindRPCAddr()[:len(a2.bindRPCAddr())-4] + "8946"}, 3) + defer rfn3() + time.Sleep(2 * time.Second) + assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader()) + servers := a1.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a2.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a3.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + + _ = a1.Stop() + + time.Sleep(30 * time.Second) + + assert.True(t, !a1.IsLeader() && (a2.IsLeader() || a3.IsLeader())) + + //servers = a2.raft.GetConfiguration().Configuration().Servers + //assert.Equal(t, 2, len(servers)) + //servers = a3.raft.GetConfiguration().Configuration().Servers + //assert.Equal(t, 2, len(servers)) + + _ = a2.Stop() + + time.Sleep(20 * time.Second) + + assert.True(t, !a1.IsLeader() && !a2.IsLeader() && !a3.IsLeader()) + + //servers = a3.raft.GetConfiguration().Configuration().Servers + //assert.Equal(t, 1, len(servers)) + + a1, rfn1 = buildAndRunAgent("test8", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3) + defer rfn1() + a2, rfn2 = buildAndRunAgent("test9", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3) + defer rfn2() + + time.Sleep(10 * time.Second) + + assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader()) + servers = a1.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a2.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) + servers = a3.raft.GetConfiguration().Configuration().Servers + assert.Equal(t, 3, len(servers)) +} + +func buildAndRunAgent( + nodeName string, + startJoin []string, + bootstrapExpect int, +) (*Agent, func()) { + + dir, err := os.MkdirTemp("", fmt.Sprintf("test-%s", nodeName)) + if err != nil { + panic(err.Error()) + } + defer os.RemoveAll(dir) + ip, returnFn := testutil.TakeIP() + defer returnFn() + addr := ip.String() + + // Start another agent + c := DefaultConfig() + c.BindAddr = addr + c.StartJoin = startJoin + c.NodeName = nodeName + c.Server = true + c.LogLevel = logLevel + c.BootstrapExpect = bootstrapExpect + c.DevMode = true + c.DataDir = dir + c.RaftMultiplier = 1 + + a2 := NewAgent(c) + err = a2.Start() + if err != nil { + panic(err.Error()) + } + + return a2, func() { + _ = a2.Stop() + returnFn() + _ = os.RemoveAll(dir) + } +} diff --git a/dkron/serf.go b/dkron/serf.go index 980139dc8..249f2651d 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -2,6 +2,7 @@ package dkron import ( "strings" + "time" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -11,6 +12,9 @@ const ( // StatusReap is used to update the status of a node if we // are handling a EventMemberReap StatusReap = serf.MemberStatus(-1) + + // maxPeerRetries limits how many invalidate attempts are made + maxPeerRetries = 6 ) // nodeJoin is used to handle join events on the serf cluster @@ -110,7 +114,46 @@ func (a *Agent) maybeBootstrap() { return } - // TODO: Query each of the servers and make sure they report no Raft peers. + // Query each of the servers and make sure they report no Raft peers. + for _, server := range servers { + var peers []string + + // Retry with exponential backoff to get peer status from this server + for attempt := uint(0); attempt < maxPeerRetries; attempt++ { + configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String()) + if err != nil { + nextRetry := (1 << attempt) * time.Second + a.logger.Error("Failed to confirm peer status for server (will retry).", + "server", server.Name, + "retry_interval", nextRetry.String(), + "error", err, + ) + time.Sleep(nextRetry) + } else { + for _, peer := range configuration.Servers { + peers = append(peers, peer.Id) + } + break + } + } + + // Found a node with some Raft peers, stop bootstrap since there's + // evidence of an existing cluster. We should get folded in by the + // existing servers if that's the case, so it's cleaner to sit as a + // candidate with no peers so we don't cause spurious elections. + // It's OK this is racy, because even with an initial bootstrap + // as long as one peer runs bootstrap things will work, and if we + // have multiple peers bootstrap in the same way, that's OK. We + // just don't want a server added much later to do a live bootstrap + // and interfere with the cluster. This isn't required for Raft's + // correctness because no server in the existing cluster will vote + // for this server, but it makes things much more stable. + if len(peers) > 0 { + a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name) + a.config.BootstrapExpect = 0 + return + } + } // Update the peer set // Attempt a live bootstrap! diff --git a/dkron/server_lookup.go b/dkron/server_lookup.go index 2343cc433..eac4038c9 100644 --- a/dkron/server_lookup.go +++ b/dkron/server_lookup.go @@ -26,7 +26,7 @@ func (sl *ServerLookup) AddServer(server *ServerParts) { sl.lock.Lock() defer sl.lock.Unlock() sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server - sl.idToServer[raft.ServerID(server.ID)] = server + sl.idToServer[raft.ServerID(server.Name)] = server } func (sl *ServerLookup) RemoveServer(server *ServerParts) { From 6c5da3fda5daf9ae27447c29504d64665af4d6e1 Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Wed, 20 Dec 2023 16:59:58 +0400 Subject: [PATCH 05/12] don't remove itself if node is a raft leader --- dkron/leader.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dkron/leader.go b/dkron/leader.go index 1605cbe6e..27d169f57 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -3,6 +3,7 @@ package dkron import ( "fmt" "net" + "strings" "sync" "time" @@ -315,6 +316,15 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error { // removeRaftPeer is used to remove a Raft peer when a dkron server leaves // or is reaped func (a *Agent) removeRaftPeer(m serf.Member, parts *ServerParts) error { + + // Do not remove ourself. This can only happen if the current leader + // is leaving. Instead, we should allow a follower to take-over and + // deregister us later. + if strings.EqualFold(m.Name, a.config.NodeName) { + a.logger.Warn("removing self should be done by follower", "name", a.config.NodeName) + return nil + } + // See if it's already in the configuration. It's harmless to re-remove it // but we want to avoid doing that if possible to prevent useless Raft // log entries. From a10e8529f39f7e2c59e08580c380c9998fce22d5 Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Wed, 20 Dec 2023 17:32:59 +0400 Subject: [PATCH 06/12] don't remove dkron server node if id matches --- dkron/leader.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dkron/leader.go b/dkron/leader.go index 27d169f57..1d3119517 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -285,17 +285,12 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error { if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) { return nil } - future := a.raft.RemoveServer(server.ID, 0, 0) if server.Address == raft.ServerAddress(addr) { + future := a.raft.RemoveServer(server.ID, 0, 0) if err := future.Error(); err != nil { return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err) } a.logger.WithField("server", server.Address).Info("dkron: removed server with duplicate address") - } else { - if err := future.Error(); err != nil { - return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err) - } - a.logger.WithField("server", server.ID).Info("dkron: removed server with duplicate ID") } } } From cbb6cb52b18a874fc61999af5e2af3d2c5f119e1 Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Thu, 21 Dec 2023 11:20:30 +0400 Subject: [PATCH 07/12] remove test at the moment (is too slow) --- dkron/agent_test.go | 92 --------------------------------------------- 1 file changed, 92 deletions(-) diff --git a/dkron/agent_test.go b/dkron/agent_test.go index a9a33b0d0..78d3d62d2 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -636,95 +636,3 @@ func Test_selectNodes(t *testing.T) { }) } } - -func Test_clusterWillRecoverAfterIpChange(t *testing.T) { - a1, rfn1 := buildAndRunAgent("test8", []string{}, 3) - defer rfn1() - a2, rfn2 := buildAndRunAgent("test9", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946"}, 3) - defer rfn2() - a3, rfn3 := buildAndRunAgent("test10", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946", a2.bindRPCAddr()[:len(a2.bindRPCAddr())-4] + "8946"}, 3) - defer rfn3() - time.Sleep(2 * time.Second) - assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader()) - servers := a1.raft.GetConfiguration().Configuration().Servers - assert.Equal(t, 3, len(servers)) - servers = a2.raft.GetConfiguration().Configuration().Servers - assert.Equal(t, 3, len(servers)) - servers = a3.raft.GetConfiguration().Configuration().Servers - assert.Equal(t, 3, len(servers)) - - _ = a1.Stop() - - time.Sleep(30 * time.Second) - - assert.True(t, !a1.IsLeader() && (a2.IsLeader() || a3.IsLeader())) - - //servers = a2.raft.GetConfiguration().Configuration().Servers - //assert.Equal(t, 2, len(servers)) - //servers = a3.raft.GetConfiguration().Configuration().Servers - //assert.Equal(t, 2, len(servers)) - - _ = a2.Stop() - - time.Sleep(20 * time.Second) - - assert.True(t, !a1.IsLeader() && !a2.IsLeader() && !a3.IsLeader()) - - //servers = a3.raft.GetConfiguration().Configuration().Servers - //assert.Equal(t, 1, len(servers)) - - a1, rfn1 = buildAndRunAgent("test8", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3) - defer rfn1() - a2, rfn2 = buildAndRunAgent("test9", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3) - defer rfn2() - - time.Sleep(10 * time.Second) - - assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader()) - servers = a1.raft.GetConfiguration().Configuration().Servers - assert.Equal(t, 3, len(servers)) - servers = a2.raft.GetConfiguration().Configuration().Servers - assert.Equal(t, 3, len(servers)) - servers = a3.raft.GetConfiguration().Configuration().Servers - assert.Equal(t, 3, len(servers)) -} - -func buildAndRunAgent( - nodeName string, - startJoin []string, - bootstrapExpect int, -) (*Agent, func()) { - - dir, err := os.MkdirTemp("", fmt.Sprintf("test-%s", nodeName)) - if err != nil { - panic(err.Error()) - } - defer os.RemoveAll(dir) - ip, returnFn := testutil.TakeIP() - defer returnFn() - addr := ip.String() - - // Start another agent - c := DefaultConfig() - c.BindAddr = addr - c.StartJoin = startJoin - c.NodeName = nodeName - c.Server = true - c.LogLevel = logLevel - c.BootstrapExpect = bootstrapExpect - c.DevMode = true - c.DataDir = dir - c.RaftMultiplier = 1 - - a2 := NewAgent(c) - err = a2.Start() - if err != nil { - panic(err.Error()) - } - - return a2, func() { - _ = a2.Stop() - returnFn() - _ = os.RemoveAll(dir) - } -} From 16a002b59bd306513e88b87c96a919de1ae5d211 Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Thu, 21 Dec 2023 12:10:25 +0400 Subject: [PATCH 08/12] use raft timeout from consts --- dkron/agent.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index e51787643..5f163cd77 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -318,13 +318,11 @@ func (a *Agent) setupRaft() error { logger = a.logger.Logger.Writer() } - var serverAddressProvider raft.ServerAddressProvider = a.serverLookup - transConfig := &raft.NetworkTransportConfig{ Stream: a.raftLayer, MaxPool: 3, - Timeout: 10 * time.Second, - ServerAddressProvider: serverAddressProvider, + Timeout: raftTimeout, + ServerAddressProvider: a.serverLookup, } transport := raft.NewNetworkTransportWithConfig(transConfig) a.raftTransport = transport From 986ef635a61e4eb07c02194090158361ae8b588f Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Thu, 21 Dec 2023 12:13:38 +0400 Subject: [PATCH 09/12] fix log --- dkron/serf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkron/serf.go b/dkron/serf.go index 249f2651d..5398ddbd8 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -251,7 +251,7 @@ func (a *Agent) lanNodeUpdate(me serf.MemberEvent) { if !ok { continue } - a.logger.Info("Updating LAN server", "server", parts.String()) + a.logger.WithField("server", parts.String()).Info("Updating LAN server") // Update server lookup a.serverLookup.AddServer(parts) From 2d3e3d968334454ca6354c5bfdab0986db2b4cea Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Thu, 4 Jan 2024 18:33:26 +0400 Subject: [PATCH 10/12] add simple server_lookup_test.go & small fix (use ID instead of name in server_lookup.go) --- dkron/server_lookup.go | 2 +- dkron/server_lookup_test.go | 92 +++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 dkron/server_lookup_test.go diff --git a/dkron/server_lookup.go b/dkron/server_lookup.go index eac4038c9..2343cc433 100644 --- a/dkron/server_lookup.go +++ b/dkron/server_lookup.go @@ -26,7 +26,7 @@ func (sl *ServerLookup) AddServer(server *ServerParts) { sl.lock.Lock() defer sl.lock.Unlock() sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server - sl.idToServer[raft.ServerID(server.Name)] = server + sl.idToServer[raft.ServerID(server.ID)] = server } func (sl *ServerLookup) RemoveServer(server *ServerParts) { diff --git a/dkron/server_lookup_test.go b/dkron/server_lookup_test.go new file mode 100644 index 000000000..559fb9a8e --- /dev/null +++ b/dkron/server_lookup_test.go @@ -0,0 +1,92 @@ +package dkron + +import ( + "testing" + + "github.com/hashicorp/raft" + "github.com/stretchr/testify/require" +) + +type testAddr struct { + addr string +} + +func (ta *testAddr) Network() string { + return "tcp" +} + +func (ta *testAddr) String() string { + return ta.addr +} + +func TestAddServer(t *testing.T) { + // arrange + lookup := NewServerLookup() + id1, addr1 := "svr-1", "127.0.0.1:8300" + id2, addr2 := "svr-2", "127.0.0.2:8300" + server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2) + + // act + lookup.AddServer(server1) + lookup.AddServer(server2) + + // assert + servers := lookup.Servers() + require.EqualValuesf(t, 2, len(servers), "Expected 1 servers but got %v", len(servers)) + require.Containsf(t, servers, server1, "Expected server %v to be in the list of servers", server1) + require.Containsf(t, servers, server2, "Expected server %v to be in the list of servers", server2) + + got, err := lookup.ServerAddr(raft.ServerID(id1)) + require.NoErrorf(t, err, "Unexpected error: %v", err) + require.EqualValuesf(t, addr1, string(got), "Expected %v but got %v", addr1, got) + + server := lookup.Server(raft.ServerAddress(addr1)) + strAddr := server.RPCAddr.String() + require.EqualValuesf(t, addr1, strAddr, "Expected lookup to return address %v but got %v", addr1, strAddr) + + got, err = lookup.ServerAddr(raft.ServerID(id2)) + require.NoErrorf(t, err, "Unexpected error: %v", err) + require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got) + + server = lookup.Server(raft.ServerAddress(addr2)) + strAddr = server.RPCAddr.String() + require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr) +} + +func TestRemoveServer(t *testing.T) { + // arrange + lookup := NewServerLookup() + id1, addr1 := "svr-1", "127.0.0.1:8300" + id2, addr2 := "svr-2", "127.0.0.2:8300" + server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2) + lookup.AddServer(server1) + lookup.AddServer(server2) + + // act + lookup.RemoveServer(server1) + + // assert + servers := lookup.Servers() + require.EqualValuesf(t, 1, len(servers), "Expected 0 servers but got %v", len(servers)) + require.Containsf(t, servers, server2, "Expected server %v to be in the list of servers", server2) + + require.Nilf(t, lookup.Server(raft.ServerAddress(addr1)), "Expected lookup to return nil") + addr, err := lookup.ServerAddr(raft.ServerID(id1)) + require.Errorf(t, err, "Expected lookup to return error") + require.EqualValuesf(t, "", string(addr), "Expected empty address but got %v", addr) + + got, err := lookup.ServerAddr(raft.ServerID(id2)) + require.NoErrorf(t, err, "Unexpected error: %v", err) + require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got) + + server := lookup.Server(raft.ServerAddress(addr2)) + strAddr := server.RPCAddr.String() + require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr) +} + +func buildServerParts(id, addr string) *ServerParts { + return &ServerParts{ + ID: id, + RPCAddr: &testAddr{addr}, + } +} From 1b4ef3fcbf3f2908f48d37f89e2dafa828378639 Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Thu, 4 Jan 2024 18:36:18 +0400 Subject: [PATCH 11/12] minor fix in test --- dkron/server_lookup_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dkron/server_lookup_test.go b/dkron/server_lookup_test.go index 559fb9a8e..2c185dde6 100644 --- a/dkron/server_lookup_test.go +++ b/dkron/server_lookup_test.go @@ -22,8 +22,8 @@ func (ta *testAddr) String() string { func TestAddServer(t *testing.T) { // arrange lookup := NewServerLookup() - id1, addr1 := "svr-1", "127.0.0.1:8300" - id2, addr2 := "svr-2", "127.0.0.2:8300" + id1, addr1 := "server-1", "127.0.0.1:8300" + id2, addr2 := "server-2", "127.0.0.2:8300" server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2) // act @@ -56,8 +56,8 @@ func TestAddServer(t *testing.T) { func TestRemoveServer(t *testing.T) { // arrange lookup := NewServerLookup() - id1, addr1 := "svr-1", "127.0.0.1:8300" - id2, addr2 := "svr-2", "127.0.0.2:8300" + id1, addr1 := "server-1", "127.0.0.1:8300" + id2, addr2 := "server-2", "127.0.0.2:8300" server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2) lookup.AddServer(server1) lookup.AddServer(server2) From f1307c130038bd358da1b36f1ccd5ba22b0166e7 Mon Sep 17 00:00:00 2001 From: Ivan Kripakov Date: Mon, 8 Jan 2024 18:29:37 +0400 Subject: [PATCH 12/12] minor simplification in tests --- dkron/server_lookup_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/dkron/server_lookup_test.go b/dkron/server_lookup_test.go index 2c185dde6..92a59aa6d 100644 --- a/dkron/server_lookup_test.go +++ b/dkron/server_lookup_test.go @@ -32,9 +32,8 @@ func TestAddServer(t *testing.T) { // assert servers := lookup.Servers() - require.EqualValuesf(t, 2, len(servers), "Expected 1 servers but got %v", len(servers)) - require.Containsf(t, servers, server1, "Expected server %v to be in the list of servers", server1) - require.Containsf(t, servers, server2, "Expected server %v to be in the list of servers", server2) + expectedServers := []*ServerParts{server1, server2} + require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers) got, err := lookup.ServerAddr(raft.ServerID(id1)) require.NoErrorf(t, err, "Unexpected error: %v", err) @@ -67,8 +66,8 @@ func TestRemoveServer(t *testing.T) { // assert servers := lookup.Servers() - require.EqualValuesf(t, 1, len(servers), "Expected 0 servers but got %v", len(servers)) - require.Containsf(t, servers, server2, "Expected server %v to be in the list of servers", server2) + expectedServers := []*ServerParts{server2} + require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers) require.Nilf(t, lookup.Server(raft.ServerAddress(addr1)), "Expected lookup to return nil") addr, err := lookup.ServerAddr(raft.ServerID(id1))