Skip to content

Commit

Permalink
[benchmark] Don't obliterate validators as much when wrapping up
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Mar 4, 2025
1 parent 2666a06 commit 3c661dc
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
10 changes: 2 additions & 8 deletions linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ where
blocks_infos: Vec<(ChainId, Vec<Operation>, AccountSecretKey)>,
committee: Committee,
local_node: LocalNodeClient<S>,
close_chains: bool,
health_check_endpoints: Option<String>,
) -> Result<(), BenchmarkError> {
let shutdown_notifier = CancellationToken::new();
Expand Down Expand Up @@ -203,7 +202,6 @@ where
sender,
committee,
local_node,
close_chains,
)
.await?;

Expand Down Expand Up @@ -472,7 +470,6 @@ where
sender: crossbeam_channel::Sender<()>,
committee: Committee,
local_node: LocalNodeClient<S>,
close_chains: bool,
) -> Result<(), BenchmarkError> {
let chain_id = chain_client.chain_id();
info!(
Expand Down Expand Up @@ -534,16 +531,13 @@ where
}
}

if close_chains {
Self::close_benchmark_chain(chain_client).await?;
}
info!("Exiting task...");
Ok(())
}

/// Closes the chain that was created for the benchmark.
async fn close_benchmark_chain(
chain_client: ChainClient<NodeProvider, S>,
pub async fn close_benchmark_chain(
chain_client: &ChainClient<NodeProvider, S>,
) -> Result<(), BenchmarkError> {
let start = Instant::now();
chain_client
Expand Down
5 changes: 5 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,11 @@ pub enum ClientCommand {
/// Example: "127.0.0.1:21100,validator-1.some-network.linera.net:21100"
#[arg(long)]
health_check_endpoints: Option<String>,
/// The maximum number of in-flight requests to validators when wrapping up the benchmark.
/// While wrapping up, this controls the concurrency level when processing inboxes and
/// closing chains.
#[arg(long, default_value = "5")]
wrap_up_max_in_flight: usize,
},

/// Create genesis configuration for a Linera deployment.
Expand Down
42 changes: 27 additions & 15 deletions linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ use linera_views::store::CommonStoreConfig;
use serde_json::Value;
use tokio::task::JoinSet;
use tracing::{debug, error, info, warn, Instrument as _};
#[cfg(feature = "benchmark")]
use {
futures::{stream, TryStreamExt},
linera_client::benchmark::BenchmarkError,
linera_core::client::ChainClientError,
};

mod net_up_utils;

Expand Down Expand Up @@ -739,6 +745,7 @@ impl Runnable for Job {
bps,
close_chains,
health_check_endpoints,
wrap_up_max_in_flight,
} => {
assert!(num_chains > 0, "Number of chains must be greater than 0");
assert!(
Expand Down Expand Up @@ -771,28 +778,33 @@ impl Runnable for Job {
blocks_infos,
committee,
context.client.local_node().clone(),
close_chains,
health_check_endpoints,
)
.await?;

if !close_chains {
if close_chains {
info!("Closing chains...");
let stream = stream::iter(chain_clients.values().cloned())
.map(|chain_client| async move {
linera_client::benchmark::Benchmark::<S>::close_benchmark_chain(
&chain_client,
)
.await?;
info!("Closed chain {:?}", chain_client.chain_id());
Ok::<(), BenchmarkError>(())
})
.buffer_unordered(wrap_up_max_in_flight);
stream.try_collect::<Vec<_>>().await?;
} else {
info!("Processing inbox for all chains...");
let mut join_set = JoinSet::<Result<(), linera_client::Error>>::new();
for chain_client in chain_clients.values() {
let chain_client = chain_client.clone();
join_set.spawn(async move {
let stream = stream::iter(chain_clients.values().cloned())
.map(|chain_client| async move {
chain_client.process_inbox().await?;
info!("Processed inbox for chain {:?}", chain_client.chain_id());
Ok(())
});
}

join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<()>, _>>()?;
Ok::<(), ChainClientError>(())
})
.buffer_unordered(wrap_up_max_in_flight);
stream.try_collect::<Vec<_>>().await?;

info!("Updating wallet from chain clients...");
for chain_client in chain_clients.values() {
Expand Down

0 comments on commit 3c661dc

Please sign in to comment.