From f2fc5e05cb6084c007c4fa8f2a1df515e2e73a74 Mon Sep 17 00:00:00 2001 From: Cedric Fung Date: Tue, 7 May 2024 08:28:44 +0000 Subject: [PATCH] fix transaction cache queue broadcast destination peer selection --- kernel/node.go | 11 +++++++++++ kernel/queue.go | 27 +++++++++++++++------------ rpc/consensus_test.go | 9 ++++++--- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/kernel/node.go b/kernel/node.go index 53e75f299..7a7327371 100644 --- a/kernel/node.go +++ b/kernel/node.go @@ -215,6 +215,17 @@ func (node *Node) PledgingNode(timestamp uint64) *CNode { return nil } +func (node *Node) ListWorkingAcceptedNodes(timestamp uint64) []*CNode { + nodes := node.NodesListWithoutState(timestamp, true) + if len(nodes) == 0 { + return nodes + } + if node.GetRemovingOrSlashingNode(nodes[0].IdForNetwork) != nil { + return nodes[1:] + } + return nodes +} + func (node *Node) GetAcceptedOrPledgingNode(id crypto.Hash) *CNode { nodes := node.NodesListWithoutState(uint64(clock.Now().UnixNano()), false) for _, cn := range nodes { diff --git a/kernel/queue.go b/kernel/queue.go index a08ef620b..85ea848f5 100644 --- a/kernel/queue.go +++ b/kernel/queue.go @@ -59,8 +59,8 @@ func (node *Node) loopCacheQueue() error { continue } - neighbors := node.Peer.Neighbors() - if len(neighbors) <= 0 { + working := node.ListWorkingAcceptedNodes(uint64(clock.Now().UnixNano())) + if len(working) <= 0 { continue } var stale []crypto.Hash @@ -93,19 +93,22 @@ func (node *Node) loopCacheQueue() error { nbor := node.electSnapshotNode(tx.TransactionType(), uint64(now.UnixNano())) if !nbor.HasValue() { hb := new(big.Int).SetBytes(hash[:]) - mb := big.NewInt(now.Unix() / 60) + mb := big.NewInt(now.UnixNano() / int64(time.Minute)) ib := new(big.Int).Add(hb, mb) - idx := new(big.Int).Mod(ib, big.NewInt(int64(len(neighbors)))) - nbor = neighbors[idx.Int64()].IdForNetwork + idx := new(big.Int).Mod(ib, big.NewInt(int64(len(working)))) + nbor = working[idx.Int64()].IdForNetwork } - node.SendTransactionToPeer(nbor, hash) - - s := &common.Snapshot{ - Version: common.SnapshotVersionCommonEncoding, - NodeId: node.IdForNetwork, + if nbor != node.IdForNetwork { + err := node.SendTransactionToPeer(nbor, hash) + logger.Debugf("queue.SendTransactionToPeer(%s, %s) => %v", hash, nbor, err) + } else { + s := &common.Snapshot{ + Version: common.SnapshotVersionCommonEncoding, + NodeId: node.IdForNetwork, + } + s.AddSoleTransaction(hash) + node.chain.AppendSelfEmpty(s) } - s.AddSoleTransaction(hash) - node.chain.AppendSelfEmpty(s) } if err != nil { logger.Printf("LoopCacheQueue CacheRetrieveTransactions ERROR %s\n", err) diff --git a/rpc/consensus_test.go b/rpc/consensus_test.go index b4a1317f9..8e28e095e 100644 --- a/rpc/consensus_test.go +++ b/rpc/consensus_test.go @@ -107,7 +107,8 @@ func testConsensus(t *testing.T, withRelayers bool) { require.Equal(transactionsCount, len(tl)) require.Equal(transactionsCount, len(sl)) gt := testVerifyInfo(require, nodes) - require.Truef(gt.Timestamp.Before(epoch.Add(1*time.Second)), "%s should before %s", gt.Timestamp, epoch.Add(1*time.Second)) + gts := epoch.Add(time.Second) + require.Truef(gt.Timestamp.Before(gts), "%s should before %s", gt.Timestamp, gts) genesisAmount := (13439 + 3.5) / float64(INPUTS) domainAddress := accounts[0].String() @@ -129,7 +130,8 @@ func testConsensus(t *testing.T, withRelayers bool) { testVerifyDeposits(require, nodes, deposits) gt = testVerifyInfo(require, nodes) - require.Truef(gt.Timestamp.Before(epoch.Add(20*time.Second)), "%s should before %s", gt.Timestamp, epoch.Add(20*time.Second)) + gts = epoch.Add(20 * time.Second) + require.Truef(gt.Timestamp.Before(gts), "%s should before %s", gt.Timestamp, gts) hr := testDumpGraphHead(nodes[0].Host, instances[0].IdForNetwork) require.NotNil(hr) require.GreaterOrEqual(hr.Round, uint64(0)) @@ -157,7 +159,8 @@ func testConsensus(t *testing.T, withRelayers bool) { require.Equal(transactionsCount, len(tl)) gt = testVerifyInfo(require, nodes) - require.True(gt.Timestamp.Before(epoch.Add(31 * time.Second))) + gts = epoch.Add(31 * time.Second) + require.Truef(gt.Timestamp.Before(gts), "%s should before %s", gt.Timestamp, gts) hr = testDumpGraphHead(nodes[0].Host, instances[0].IdForNetwork) require.NotNil(hr) require.Greater(hr.Round, uint64(0))