diff --git a/Cargo.lock b/Cargo.lock index 8cbb211f7661a7..52f13fa80ecf93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10367,6 +10367,7 @@ version = "2.2.0" dependencies = [ "base64 0.22.1", "bincode", + "bs58", "lazy_static", "log", "rand 0.8.5", diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index e24b18412a7d82..53343bbafb7183 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -316,6 +316,8 @@ impl Drop for AppendVec { if self.remove_file_on_drop.load(Ordering::Acquire) { // If we're reopening in readonly mode, we don't delete the file. See // AppendVec::reopen_as_readonly. + let bt = std::backtrace::Backtrace::capture(); + info!("Removing appendvec file {:?} {:?}", self.path, bt); if let Err(_err) = remove_file(&self.path) { // promote this to panic soon. // disabled due to many false positive warnings while running tests. diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 049e34cb5168c0..bed19022bb728e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -144,6 +144,13 @@ impl Default for ClusterConfig { } } +struct QuicConnectionCacheConfig { + stake: u64, + total_stake: u64, + client_keypair: Keypair, + staked_nodes: Arc>, +} + pub struct LocalCluster { /// Keypair with funding to participate in the network pub funding_keypair: Keypair, @@ -152,6 +159,8 @@ pub struct LocalCluster { pub validators: HashMap, pub genesis_config: GenesisConfig, pub connection_cache: Arc, + quic_connection_cache_config: Option, + tpu_connection_pool_size: usize, } impl LocalCluster { @@ -191,21 +200,9 @@ impl LocalCluster { pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self { assert_eq!(config.validator_configs.len(), config.node_stakes.len()); - let connection_cache = if config.tpu_use_quic { - let client_keypair = Keypair::new(); + let quic_connection_cache_config = if config.tpu_use_quic { + let client_keypair: Keypair = Keypair::new(); let stake = DEFAULT_NODE_STAKE; - - for validator_config in config.validator_configs.iter_mut() { - let mut overrides = HashMap::new(); - overrides.insert(client_keypair.pubkey(), stake); - validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides)); - } - - assert!( - config.tpu_use_quic, - "no support for staked override forwarding without quic" - ); - let total_stake = config.node_stakes.iter().sum::(); let stakes = HashMap::from([ (client_keypair.pubkey(), stake), @@ -216,20 +213,26 @@ impl LocalCluster { HashMap::::default(), // overrides ))); - Arc::new(ConnectionCache::new_with_client_options( - "connection_cache_local_cluster_quic_staked", - config.tpu_connection_pool_size, - None, - Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), - Some((&staked_nodes, &client_keypair.pubkey())), - )) + for validator_config in config.validator_configs.iter_mut() { + let mut overrides = HashMap::new(); + overrides.insert(client_keypair.pubkey(), stake); + validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides)); + } + Some(QuicConnectionCacheConfig { + stake, + total_stake, + client_keypair, + staked_nodes, + }) } else { - Arc::new(ConnectionCache::with_udp( - "connection_cache_local_cluster_udp", - config.tpu_connection_pool_size, - )) + None }; + let connection_cache = create_connection_cache( + &quic_connection_cache_config, + config.tpu_connection_pool_size, + ); + let mut validator_keys = { if let Some(ref keys) = config.validator_keys { assert_eq!(config.validator_configs.len(), keys.len()); @@ -379,6 +382,8 @@ impl LocalCluster { validators, genesis_config, connection_cache, + quic_connection_cache_config, + tpu_connection_pool_size: config.tpu_connection_pool_size, }; let node_pubkey_to_vote_key: HashMap> = keys_in_genesis @@ -709,6 +714,10 @@ impl LocalCluster { while now.elapsed().as_secs() < wait_time as u64 { if num_confirmed == 0 { + info!( + "zzzzz sending transaction {:?}, {} over {attempts}", + transaction, attempt + ); client.send_transaction_to_upcoming_leaders(transaction)?; } @@ -717,6 +726,10 @@ impl LocalCluster { pending_confirmations, ) { num_confirmed = confirmed_blocks; + info!( + "zzzzz confirmed blocks: {} pending: {} for {transaction:?}", + confirmed_blocks, pending_confirmations + ); if confirmed_blocks >= pending_confirmations { return Ok(transaction.signatures[0]); } @@ -726,9 +739,11 @@ impl LocalCluster { wait_time = wait_time.max( MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), ); + } else { + info!("zzzzz failed to poll_for_signature_confirmation num_confirmed: {} pending: {} for {transaction:?}", num_confirmed, pending_confirmations); } } - info!("{attempt} tries failed transfer"); + info!("{attempt}/{attempts} tries failed transfer"); let blockhash = client.rpc_client().get_latest_blockhash()?; transaction.sign(keypairs, blockhash); } @@ -973,6 +988,30 @@ impl LocalCluster { } } +fn create_connection_cache( + quic_connection_cache_config: &Option, + tpu_connection_pool_size: usize, +) -> Arc { + let connection_cache = if let Some(config) = quic_connection_cache_config { + Arc::new(ConnectionCache::new_with_client_options( + "connection_cache_local_cluster_quic_staked", + tpu_connection_pool_size, + None, + Some(( + &config.client_keypair, + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + )), + Some((&config.staked_nodes, &config.client_keypair.pubkey())), + )) + } else { + Arc::new(ConnectionCache::with_udp( + "connection_cache_local_cluster_udp", + tpu_connection_pool_size, + )) + }; + connection_cache +} + impl Cluster for LocalCluster { fn get_node_pubkeys(&self) -> Vec { self.validators.keys().cloned().collect() @@ -1059,6 +1098,12 @@ impl Cluster for LocalCluster { socket_addr_space, ); self.add_node(pubkey, cluster_validator_info); + + // reset the connection cache as we are connecting to the new nodes + self.connection_cache = create_connection_cache( + &self.quic_connection_cache_config, + self.tpu_connection_pool_size, + ); } fn add_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo) { diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 6240ade176c086..65fe06d49068ac 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -93,6 +93,7 @@ impl LocalCluster { if let Some(full_snapshot_archive_info) = snapshot_utils::get_highest_full_snapshot_archive_info(&full_snapshot_archives_dir) { + trace!("Got snapshot info: {:?}", full_snapshot_archive_info); match next_snapshot_type { NextSnapshotType::FullSnapshot => { if full_snapshot_archive_info.slot() >= last_slot { @@ -115,6 +116,11 @@ impl LocalCluster { } } } + } else { + trace!( + "Could not get snapshot info from {:?}", + full_snapshot_archives_dir.as_ref() + ); } if let Some(max_wait_duration) = max_wait_duration { assert!( diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index e3010cd9eb7ced..821781b7dc3e31 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1448,6 +1448,8 @@ fn test_snapshots_restart_validity() { 10, ); + trace!("Waiting for for full snapshot"); + expected_balances.extend(new_balances); cluster.wait_for_next_full_snapshot( @@ -1455,6 +1457,8 @@ fn test_snapshots_restart_validity() { Some(Duration::from_secs(5 * 60)), ); + trace!("generate_account_paths..."); + // Create new account paths since validator exit is not guaranteed to cleanup RPC threads, // which may delete the old accounts on exit at any point let (new_account_storage_dirs, new_account_storage_paths) = diff --git a/sdk/quic-definitions/src/lib.rs b/sdk/quic-definitions/src/lib.rs index 244073216b49ec..c60373f24e35e1 100644 --- a/sdk/quic-definitions/src/lib.rs +++ b/sdk/quic-definitions/src/lib.rs @@ -16,8 +16,12 @@ pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000; // forwarded packets from staked nodes. pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512; -pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(2); -pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(1); +// Connection idle timeout, and keep alive. +// Quic will close the connection after QUIC_MAX_TIMEOUT, +// and send a ping every QUIC_KEEP_ALIVE. +// These shouldn't be too low to avoid unnecessary ping traffic. +pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(60); +pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(45); // Disable Quic send fairness. // When set to false, streams are still scheduled based on priority, diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index b8b3659bbd2f73..1d7a3a5901890c 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -441,6 +441,7 @@ async fn run_server( debug!("accept(): Timed out waiting for connection"); } } + info!("zzzzz quic server {:?} {}", endpoints[0].local_addr(), name); } fn prune_unstaked_connection_table( diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml index cdebb328609964..26ea9bd2c3d507 100644 --- a/transaction-metrics-tracker/Cargo.toml +++ b/transaction-metrics-tracker/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } # Update this borsh dependency to the workspace version once +bs58 = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } rand = { workspace = true } diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index 8426b159f79133..204c9b44d59bb9 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -15,7 +15,11 @@ lazy_static! { pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { // We do not use the highest signature byte as it is not really random let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; - trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); + info!( + "zzzzz Txn signature: {}", + bs58::encode(signature).into_string() + ); + info!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); *TXN_MASK == match_portion }