Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply 3515: add tpu client next to sts #4758

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 29 additions & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion send-transaction-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
crossbeam-channel = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-keypair = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-tpu-client = { workspace = true }
solana-tpu-client-next = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }

[dev-dependencies]
solana-logger = { workspace = true }
Expand Down
35 changes: 30 additions & 5 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,10 @@ impl SendTransactionService {
mod test {
use {
super::*,
crate::{test_utils::ClientWithCreator, tpu_info::NullTpuInfo},
crate::{
test_utils::ClientWithCreator, tpu_info::NullTpuInfo,
transaction_client::TpuClientNextClient,
},
crossbeam_channel::{bounded, unbounded},
solana_sdk::{
account::AccountSharedData,
Expand Down Expand Up @@ -541,14 +544,19 @@ mod test {

drop(sender);
send_transaction_service.join().unwrap();
client.cancel();
client.stop();
}

#[test]
fn service_exit_with_connection_cache() {
service_exit::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn service_exit_with_tpu_client_next() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we usually have test_ prefix. I realize this file doesn't follow that pattern, maybe we can convert it in a separate PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for some reason this file is unique in this sense.

service_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
}

fn validator_exit<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
let bank = Bank::default_for_tests();
let bank_forks = BankForks::new_rw_arc(bank);
Expand Down Expand Up @@ -581,7 +589,7 @@ mod test {

thread::spawn(move || {
exit.store(true, Ordering::Relaxed);
client.cancel();
client.stop();
});

let mut option = Ok(());
Expand All @@ -595,6 +603,11 @@ mod test {
validator_exit::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn validator_exit_with_tpu_client_next() {
validator_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
}

fn process_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
solana_logger::setup();

Expand Down Expand Up @@ -857,14 +870,19 @@ mod test {
..ProcessTransactionsResult::default()
}
);
client.cancel();
client.stop();
}

#[test]
fn process_transactions_with_connection_cache() {
process_transactions::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn process_transactions_with_tpu_client_next() {
process_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
}

fn retry_durable_nonce_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
solana_logger::setup();

Expand Down Expand Up @@ -1162,11 +1180,18 @@ mod test {
..ProcessTransactionsResult::default()
}
);
client.cancel();
client.stop();
}

#[test]
fn retry_durable_nonce_transactions_with_connection_cache() {
retry_durable_nonce_transactions::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn retry_durable_nonce_transactions_with_tpu_client_next() {
retry_durable_nonce_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(
Handle::current(),
));
}
}
45 changes: 38 additions & 7 deletions send-transaction-service/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
use {
crate::{
tpu_info::NullTpuInfo,
transaction_client::{ConnectionCacheClient, TpuInfoWithSendStatic, TransactionClient},
transaction_client::{
ConnectionCacheClient, TpuClientNextClient, TpuInfoWithSendStatic, TransactionClient,
},
},
solana_client::connection_cache::ConnectionCache,
std::{net::SocketAddr, sync::Arc},
Expand Down Expand Up @@ -42,23 +44,52 @@ impl CreateClient for ConnectionCacheClient<NullTpuInfo> {
}
}

pub trait Cancelable {
fn cancel(&self);
impl CreateClient for TpuClientNextClient<NullTpuInfo> {
fn create_client(
maybe_runtime: Option<Handle>,
my_tpu_address: SocketAddr,
tpu_peers: Option<Vec<SocketAddr>>,
leader_forward_count: u64,
) -> Self {
let runtime_handle =
maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient.");
Self::new(
runtime_handle,
my_tpu_address,
tpu_peers,
None,
leader_forward_count,
None,
)
}
}

pub trait Stoppable {
fn stop(&self);
}

impl<T> Cancelable for ConnectionCacheClient<T>
impl<T> Stoppable for ConnectionCacheClient<T>
where
T: TpuInfoWithSendStatic,
{
fn cancel(&self) {}
fn stop(&self) {}
}

impl<T> Stoppable for TpuClientNextClient<T>
where
T: TpuInfoWithSendStatic + Clone,
{
fn stop(&self) {
self.cancel().unwrap();
}
}

// Define type alias to simplify definition of test functions.
pub trait ClientWithCreator:
CreateClient + TransactionClient + Cancelable + Send + Clone + 'static
CreateClient + TransactionClient + Stoppable + Send + Clone + 'static
{
}
impl<T> ClientWithCreator for T where
T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static
T: CreateClient + TransactionClient + Stoppable + Send + Clone + 'static
{
}
4 changes: 0 additions & 4 deletions send-transaction-service/src/tpu_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ pub trait TpuInfo {
///
/// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2,
/// L1, ...]` it will return `[L1, L2, L1]`.
#[allow(
dead_code,
reason = "This function will be used when tpu-client-next will be added to this module."
)]
fn get_not_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would love it if we renamed get_leader_tpus to get_unique_leader_tpus and then could just name this one get_leader_tpus. Maybe follow-up PR?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This how it was, but this breaks public API. So I cannot reuse get_leader_tpus. I could have created, instead, get_not_unique_leader_tpus, get_unique_leader_tpus and leave get_leader_tpus as it is but mark deprecated. But sort of doesn't change much I think


/// In addition to the tpu address, also return the leader slot
Expand Down
Loading
Loading