From 1ab985b747b37a22dfd07c5c69017b6227ffd27a Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 29 Jan 2025 05:52:38 -0800 Subject: [PATCH 01/12] Revert "Revert "quic: increase timeout and keep alive (#4585)" (#4637)" This reverts commit 0fed49acc995ec8fbf86eaf91d7332128bbe308f. --- sdk/quic-definitions/src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/quic-definitions/src/lib.rs b/sdk/quic-definitions/src/lib.rs index 244073216b49ec..c60373f24e35e1 100644 --- a/sdk/quic-definitions/src/lib.rs +++ b/sdk/quic-definitions/src/lib.rs @@ -16,8 +16,12 @@ pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000; // forwarded packets from staked nodes. pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512; -pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(2); -pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(1); +// Connection idle timeout, and keep alive. +// Quic will close the connection after QUIC_MAX_TIMEOUT, +// and send a ping every QUIC_KEEP_ALIVE. +// These shouldn't be too low to avoid unnecessary ping traffic. +pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(60); +pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(45); // Disable Quic send fairness. // When set to false, streams are still scheduled based on priority, From b3fbb0c828e54c70b76c7efd2da3d05b5c351fc1 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 30 Jan 2025 06:05:50 -0800 Subject: [PATCH 02/12] added debug messages --- local-cluster/tests/local_cluster.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index e3010cd9eb7ced..821781b7dc3e31 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1448,6 +1448,8 @@ fn test_snapshots_restart_validity() { 10, ); + trace!("Waiting for for full snapshot"); + expected_balances.extend(new_balances); cluster.wait_for_next_full_snapshot( @@ -1455,6 +1457,8 @@ fn test_snapshots_restart_validity() { Some(Duration::from_secs(5 * 60)), ); + trace!("generate_account_paths..."); + // Create new account paths since validator exit is not guaranteed to cleanup RPC threads, // which may delete the old accounts on exit at any point let (new_account_storage_dirs, new_account_storage_paths) = From 76ba9b81f5065a0368bb0f24a865139011233bc6 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 30 Jan 2025 06:38:26 -0800 Subject: [PATCH 03/12] added debug messages --- local-cluster/src/local_cluster_snapshot_utils.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 6240ade176c086..65fe06d49068ac 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -93,6 +93,7 @@ impl LocalCluster { if let Some(full_snapshot_archive_info) = snapshot_utils::get_highest_full_snapshot_archive_info(&full_snapshot_archives_dir) { + trace!("Got snapshot info: {:?}", full_snapshot_archive_info); match next_snapshot_type { NextSnapshotType::FullSnapshot => { if full_snapshot_archive_info.slot() >= last_slot { @@ -115,6 +116,11 @@ impl LocalCluster { } } } + } else { + trace!( + "Could not get snapshot info from {:?}", + full_snapshot_archives_dir.as_ref() + ); } if let Some(max_wait_duration) = max_wait_duration { assert!( From 6436be0c4728481a2b240d24ba51e85a22f9b51c Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:17:28 -0800 Subject: [PATCH 04/12] output path on remove --- accounts-db/src/append_vec.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index e24b18412a7d82..0a669ed1810493 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -316,6 +316,7 @@ impl Drop for AppendVec { if self.remove_file_on_drop.load(Ordering::Acquire) { // If we're reopening in readonly mode, we don't delete the file. See // AppendVec::reopen_as_readonly. + info!("Removing appendvev file {:?}", self.path); if let Err(_err) = remove_file(&self.path) { // promote this to panic soon. // disabled due to many false positive warnings while running tests. From f36457840c545a725daa809757165d73cb1f0669 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 5 Feb 2025 12:07:56 -0800 Subject: [PATCH 05/12] output path on remove --- accounts-db/src/append_vec.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index 0a669ed1810493..d03e663736843e 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -316,7 +316,8 @@ impl Drop for AppendVec { if self.remove_file_on_drop.load(Ordering::Acquire) { // If we're reopening in readonly mode, we don't delete the file. See // AppendVec::reopen_as_readonly. - info!("Removing appendvev file {:?}", self.path); + let bt = std::backtrace::Backtrace::capture(); + info!("Removing appendvev file {:?} {:?}", self.path, bt); if let Err(_err) = remove_file(&self.path) { // promote this to panic soon. // disabled due to many false positive warnings while running tests. From b4fdcc8f72c2d0ad83c316ce2b3419d62a62f415 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:25:21 -0800 Subject: [PATCH 06/12] added log for validator exit --- accounts-db/src/append_vec.rs | 2 +- streamer/src/nonblocking/quic.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index d03e663736843e..53343bbafb7183 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -317,7 +317,7 @@ impl Drop for AppendVec { // If we're reopening in readonly mode, we don't delete the file. See // AppendVec::reopen_as_readonly. let bt = std::backtrace::Backtrace::capture(); - info!("Removing appendvev file {:?} {:?}", self.path, bt); + info!("Removing appendvec file {:?} {:?}", self.path, bt); if let Err(_err) = remove_file(&self.path) { // promote this to panic soon. // disabled due to many false positive warnings while running tests. diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index b8b3659bbd2f73..7c1d1242347a5f 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -441,6 +441,7 @@ async fn run_server( debug!("accept(): Timed out waiting for connection"); } } + info!("zzzzz quic server {:?}", endpoints[0].local_addr()); } fn prune_unstaked_connection_table( From e1306ea0cb7ea002827c8c50ae458cb2f57de9a7 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:28:26 -0800 Subject: [PATCH 07/12] added log for validator exit --- streamer/src/nonblocking/quic.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 7c1d1242347a5f..1d7a3a5901890c 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -441,7 +441,7 @@ async fn run_server( debug!("accept(): Timed out waiting for connection"); } } - info!("zzzzz quic server {:?}", endpoints[0].local_addr()); + info!("zzzzz quic server {:?} {}", endpoints[0].local_addr(), name); } fn prune_unstaked_connection_table( From 1f96baf2b6ae480d1be820245ed599152450aee9 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:52:33 -0800 Subject: [PATCH 08/12] more debug messages on sending txns --- local-cluster/src/local_cluster.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 049e34cb5168c0..4056c40b3d184c 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -709,6 +709,7 @@ impl LocalCluster { while now.elapsed().as_secs() < wait_time as u64 { if num_confirmed == 0 { + info!("zzzzz sending transaction {:?}, {}", transaction, attempt); client.send_transaction_to_upcoming_leaders(transaction)?; } @@ -717,6 +718,10 @@ impl LocalCluster { pending_confirmations, ) { num_confirmed = confirmed_blocks; + info!( + "zzzzz confirmed blocks: {} pending: {}", + confirmed_blocks, pending_confirmations + ); if confirmed_blocks >= pending_confirmations { return Ok(transaction.signatures[0]); } @@ -726,6 +731,8 @@ impl LocalCluster { wait_time = wait_time.max( MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), ); + } else { + info!("zzzzz failed to poll_for_signature_confirmation num_confirmed: {} pending: {}", num_confirmed, pending_confirmations); } } info!("{attempt} tries failed transfer"); From 33581da87d32d666aa00abf92c8d6b29d4b0c8d5 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 6 Feb 2025 09:36:25 -0800 Subject: [PATCH 09/12] add signature --- Cargo.lock | 1 + transaction-metrics-tracker/Cargo.toml | 1 + transaction-metrics-tracker/src/lib.rs | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 8cbb211f7661a7..5dfebd74f06680 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10367,6 +10367,7 @@ version = "2.2.0" dependencies = [ "base64 0.22.1", "bincode", + "hex", "lazy_static", "log", "rand 0.8.5", diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml index cdebb328609964..849483496772e0 100644 --- a/transaction-metrics-tracker/Cargo.toml +++ b/transaction-metrics-tracker/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } # Update this borsh dependency to the workspace version once +hex = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } rand = { workspace = true } diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index 8426b159f79133..d6562368b9fe8a 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -15,7 +15,8 @@ lazy_static! { pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { // We do not use the highest signature byte as it is not really random let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; - trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); + info!("zzzzz Txn signature: {}", hex::encode(signature)); + info!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); *TXN_MASK == match_portion } From 194a7e82fc523748a88137e2f967d246d3a81330 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 6 Feb 2025 09:57:55 -0800 Subject: [PATCH 10/12] log signature in base58 --- Cargo.lock | 2 +- transaction-metrics-tracker/Cargo.toml | 2 +- transaction-metrics-tracker/src/lib.rs | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5dfebd74f06680..52f13fa80ecf93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10367,7 +10367,7 @@ version = "2.2.0" dependencies = [ "base64 0.22.1", "bincode", - "hex", + "bs58", "lazy_static", "log", "rand 0.8.5", diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml index 849483496772e0..26ea9bd2c3d507 100644 --- a/transaction-metrics-tracker/Cargo.toml +++ b/transaction-metrics-tracker/Cargo.toml @@ -13,7 +13,7 @@ edition = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } # Update this borsh dependency to the workspace version once -hex = { workspace = true } +bs58 = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } rand = { workspace = true } diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index d6562368b9fe8a..204c9b44d59bb9 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -15,7 +15,10 @@ lazy_static! { pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { // We do not use the highest signature byte as it is not really random let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; - info!("zzzzz Txn signature: {}", hex::encode(signature)); + info!( + "zzzzz Txn signature: {}", + bs58::encode(signature).into_string() + ); info!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); *TXN_MASK == match_portion } From c768cda226542a2e107794ae8295196feca6a2ac Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 6 Feb 2025 10:23:13 -0800 Subject: [PATCH 11/12] more debugs in send_transaction_with_retries --- local-cluster/src/local_cluster.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 4056c40b3d184c..9b26bb09f64c23 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -709,7 +709,10 @@ impl LocalCluster { while now.elapsed().as_secs() < wait_time as u64 { if num_confirmed == 0 { - info!("zzzzz sending transaction {:?}, {}", transaction, attempt); + info!( + "zzzzz sending transaction {:?}, {} over {attempts}", + transaction, attempt + ); client.send_transaction_to_upcoming_leaders(transaction)?; } @@ -719,7 +722,7 @@ impl LocalCluster { ) { num_confirmed = confirmed_blocks; info!( - "zzzzz confirmed blocks: {} pending: {}", + "zzzzz confirmed blocks: {} pending: {} for {transaction:?}", confirmed_blocks, pending_confirmations ); if confirmed_blocks >= pending_confirmations { @@ -732,10 +735,10 @@ impl LocalCluster { MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), ); } else { - info!("zzzzz failed to poll_for_signature_confirmation num_confirmed: {} pending: {}", num_confirmed, pending_confirmations); + info!("zzzzz failed to poll_for_signature_confirmation num_confirmed: {} pending: {} for {transaction:?}", num_confirmed, pending_confirmations); } } - info!("{attempt} tries failed transfer"); + info!("{attempt}/{attempts} tries failed transfer"); let blockhash = client.rpc_client().get_latest_blockhash()?; transaction.sign(keypairs, blockhash); } From c82c46582ef181051058e8f8794ff2c86d9c2da0 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:40:26 -0800 Subject: [PATCH 12/12] recreate connection cache after restart --- local-cluster/src/local_cluster.rs | 85 +++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 9b26bb09f64c23..bed19022bb728e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -144,6 +144,13 @@ impl Default for ClusterConfig { } } +struct QuicConnectionCacheConfig { + stake: u64, + total_stake: u64, + client_keypair: Keypair, + staked_nodes: Arc>, +} + pub struct LocalCluster { /// Keypair with funding to participate in the network pub funding_keypair: Keypair, @@ -152,6 +159,8 @@ pub struct LocalCluster { pub validators: HashMap, pub genesis_config: GenesisConfig, pub connection_cache: Arc, + quic_connection_cache_config: Option, + tpu_connection_pool_size: usize, } impl LocalCluster { @@ -191,21 +200,9 @@ impl LocalCluster { pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self { assert_eq!(config.validator_configs.len(), config.node_stakes.len()); - let connection_cache = if config.tpu_use_quic { - let client_keypair = Keypair::new(); + let quic_connection_cache_config = if config.tpu_use_quic { + let client_keypair: Keypair = Keypair::new(); let stake = DEFAULT_NODE_STAKE; - - for validator_config in config.validator_configs.iter_mut() { - let mut overrides = HashMap::new(); - overrides.insert(client_keypair.pubkey(), stake); - validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides)); - } - - assert!( - config.tpu_use_quic, - "no support for staked override forwarding without quic" - ); - let total_stake = config.node_stakes.iter().sum::(); let stakes = HashMap::from([ (client_keypair.pubkey(), stake), @@ -216,20 +213,26 @@ impl LocalCluster { HashMap::::default(), // overrides ))); - Arc::new(ConnectionCache::new_with_client_options( - "connection_cache_local_cluster_quic_staked", - config.tpu_connection_pool_size, - None, - Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), - Some((&staked_nodes, &client_keypair.pubkey())), - )) + for validator_config in config.validator_configs.iter_mut() { + let mut overrides = HashMap::new(); + overrides.insert(client_keypair.pubkey(), stake); + validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides)); + } + Some(QuicConnectionCacheConfig { + stake, + total_stake, + client_keypair, + staked_nodes, + }) } else { - Arc::new(ConnectionCache::with_udp( - "connection_cache_local_cluster_udp", - config.tpu_connection_pool_size, - )) + None }; + let connection_cache = create_connection_cache( + &quic_connection_cache_config, + config.tpu_connection_pool_size, + ); + let mut validator_keys = { if let Some(ref keys) = config.validator_keys { assert_eq!(config.validator_configs.len(), keys.len()); @@ -379,6 +382,8 @@ impl LocalCluster { validators, genesis_config, connection_cache, + quic_connection_cache_config, + tpu_connection_pool_size: config.tpu_connection_pool_size, }; let node_pubkey_to_vote_key: HashMap> = keys_in_genesis @@ -983,6 +988,30 @@ impl LocalCluster { } } +fn create_connection_cache( + quic_connection_cache_config: &Option, + tpu_connection_pool_size: usize, +) -> Arc { + let connection_cache = if let Some(config) = quic_connection_cache_config { + Arc::new(ConnectionCache::new_with_client_options( + "connection_cache_local_cluster_quic_staked", + tpu_connection_pool_size, + None, + Some(( + &config.client_keypair, + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + )), + Some((&config.staked_nodes, &config.client_keypair.pubkey())), + )) + } else { + Arc::new(ConnectionCache::with_udp( + "connection_cache_local_cluster_udp", + tpu_connection_pool_size, + )) + }; + connection_cache +} + impl Cluster for LocalCluster { fn get_node_pubkeys(&self) -> Vec { self.validators.keys().cloned().collect() @@ -1069,6 +1098,12 @@ impl Cluster for LocalCluster { socket_addr_space, ); self.add_node(pubkey, cluster_validator_info); + + // reset the connection cache as we are connecting to the new nodes + self.connection_cache = create_connection_cache( + &self.quic_connection_cache_config, + self.tpu_connection_pool_size, + ); } fn add_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo) {