Skip to content

Commit

Permalink
benchmark tool injecting the native tokens txs bypassing the json-rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
ssavenko-near committed Feb 11, 2025
1 parent 5ddd776 commit 44ec8e2
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 1 deletion.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ exclude = ["neard"]
[workspace]
resolver = "2"
members = [
"benchmarks/transactions-generator",
"chain/chain",
"chain/chunks",
"chain/client",
Expand Down Expand Up @@ -270,6 +271,7 @@ near-schema-checker-macro = { path = "core/schema-checker/schema-checker-macro"
near-schema-checker-core = { path = "core/schema-checker/schema-checker-core" }
near-schema-checker-lib = { path = "core/schema-checker/schema-checker-lib" }
near-store = { path = "core/store" }
near-transactions-generator = { path = "benchmarks/transactions-generator"}
near-telemetry = { path = "chain/telemetry" }
near-test-contracts = { path = "runtime/near-test-contracts" }
near-time = { path = "core/time" }
Expand Down
30 changes: 30 additions & 0 deletions benchmarks/transactions-generator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "near-transactions-generator"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
repository.workspace = true
license.workspace = true

[features]
default = ["with_actix"]
with_actix = ["near-async", "near-network", "actix"]

[dependencies]
# clap = { workspace = true, features = ["derive"] }
actix = { workspace = true, optional = true }
anyhow.workspace = true
near-async = {workspace = true, optional = true }
near-crypto = { workspace = true }
near-network = { workspace = true, optional = true }
near-primitives = { workspace = true, features = ["clock", "test_utils"] }
rand.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"] }


[lints]
workspace = true
72 changes: 72 additions & 0 deletions benchmarks/transactions-generator/src/account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use near_primitives::types::AccountId;
use near_crypto::{PublicKey, SecretKey, InMemorySigner, Signer};
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
use anyhow::Context;


#[derive(Serialize, Deserialize, Clone, Debug) ]
pub struct Account {
#[serde(rename = "account_id")]
pub id: AccountId,
pub public_key: PublicKey,
pub secret_key: SecretKey,
// New transaction must have a nonce bigger than this.
pub nonce: u64,
}

impl Account {
pub fn new(id: AccountId, secret_key: SecretKey, nonce: u64) -> Self {
Self { id, public_key: secret_key.public_key(), secret_key, nonce }
}

pub fn from_file(path: &Path) -> anyhow::Result<Account> {
let content = fs::read_to_string(path)?;
let account = serde_json::from_str(&content)
.with_context(|| format!("failed reading file {path:?} as 'Account'"))?;
Ok(account)
}

pub fn write_to_dir(&self, dir: &Path) -> anyhow::Result<()> {
if !dir.exists() {
std::fs::create_dir(dir)?;
}

let json = serde_json::to_string(self)?;
let mut file_name = self.id.to_string();
file_name.push_str(".json");
let file_path = dir.join(file_name);
fs::write(file_path, json)?;
Ok(())
}

pub fn as_signer(&self) -> Signer {
Signer::from(InMemorySigner::from_secret_key(self.id.clone(), self.secret_key.clone()))
}
} // impl Account

/// Tries to deserialize all json files in `dir` as [`Account`].
pub fn accounts_from_dir(dir: &Path) -> anyhow::Result<Vec<Account>> {
if !dir.is_dir() {
anyhow::bail!("{:?} is not a directory", dir);
}

let mut accounts = vec![];
for entry in fs::read_dir(dir)? {
let entry = entry?;
let file_type = entry.file_type()?;
if !file_type.is_file() {
continue;
}
let path = entry.path();
let file_extension = path.extension();
if file_extension.is_none() || file_extension.unwrap() != "json" {
continue;
}
let account = Account::from_file(&path)?;
accounts.push(account);
}

Ok(accounts)
}
52 changes: 52 additions & 0 deletions benchmarks/transactions-generator/src/actix_actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::{TxGenerator, TxGeneratorConfig, ClientSender, ViewClientSender};

use actix::Actor;
use near_async::actix_wrapper::ActixWrapper;
use near_async::futures::DelayedActionRunner;
use near_async::messaging::{self};


pub type TxGeneratorActor = ActixWrapper<GeneratorActorImpl>;

pub struct GeneratorActorImpl {
// client_sender: ClientSender,
// view_client_sender: ViewClientSender,
tx_generator: TxGenerator,
}

impl messaging::Actor for GeneratorActorImpl {
fn start_actor(&mut self, ctx: &mut dyn DelayedActionRunner<Self>){
self.start(ctx)
}
}

impl GeneratorActorImpl {
pub fn start(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>){
match self.tx_generator.run() {
Err(err) => {
tracing::error!(target: "transaction-generator", "Error: {err}");
},
Ok(_) => {
tracing::info!(target: "transaction-generator", "Started");
}
};
}
}

pub fn start_tx_generator(
config: TxGeneratorConfig,
client_sender: ClientSender,
view_client_sender: ViewClientSender,
)-> actix::Addr<TxGeneratorActor>
{
let arbiter = actix::Arbiter::new();
let tx_generator = TxGenerator::new(config, client_sender, view_client_sender).unwrap();
TxGeneratorActor::start_in_arbiter(&arbiter.handle(),
move |_| {
let actor_impl = GeneratorActorImpl {
tx_generator,
};
ActixWrapper::new(actor_impl)
}
)
}
122 changes: 122 additions & 0 deletions benchmarks/transactions-generator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use near_async::messaging::Sender;
use near_async::{MultiSend, MultiSenderFrom};
use near_network::client::{BlockRequest, ProcessTxRequest};
use near_primitives::hash::CryptoHash;
use near_primitives::transaction::SignedTransaction;
use rand::SeedableRng;
use rand::distributions::{Distribution, Uniform};
use rand::rngs::StdRng;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};


pub mod account;
#[cfg(feature="with_actix")]
pub mod actix_actor;


#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct TxGeneratorConfig {
tps: u64,
volume: u64,
accounts_path: PathBuf,
}

impl Default for TxGeneratorConfig {
fn default()-> Self {
Self {
tps: 0,
volume: 40000,
accounts_path: "".into(),
}
}
}

#[derive(Clone, MultiSend, MultiSenderFrom)]
pub struct ClientSender{
pub tx_request_sender: Sender<ProcessTxRequest>,
}

#[derive(Clone, MultiSend, MultiSenderFrom)]
pub struct ViewClientSender{
pub tx_request_sender: Sender<BlockRequest>,
}


pub struct TxGenerator {
params: TxGeneratorConfig,
client_sender: ClientSender,
view_client_sender: ViewClientSender,
runner: Option<(thread::JoinHandle<()>, Arc<AtomicBool>)>,
}

impl TxGenerator {
pub fn new(
params: TxGeneratorConfig,
client_sender: ClientSender,
view_client_sender: ViewClientSender
)-> anyhow::Result<Self>
{
Ok(Self {
params, client_sender, view_client_sender, runner: None,
})
}

fn run(self: &mut Self)-> anyhow::Result<()> {
const AMOUNT: near_primitives::types::Balance = 1_000;
if let Some(_) = self.runner {
anyhow::bail!("attempt to (re)start the running transaction generator");
}
let accounts = account::accounts_from_dir(&self.params.accounts_path)?;
if accounts.is_empty() {
anyhow::bail!("No active accounts available");
}
let client_sender = self.client_sender.clone();
let stop_token = Arc::new(AtomicBool::new(false));
let stop = stop_token.clone();
let time_interval = Duration::from_micros(1_000_000/self.params.tps);

let handle = thread::spawn(move ||{
let mut rnd: StdRng = SeedableRng::from_entropy();
tracing::info!(target: "transaction-generator", "thread starting"); // debug

let mut next_tx_at = Instant::now();
while !stop.load(std::sync::atomic::Ordering::Acquire) {
// todo(slavas): use thread::sleep_until(next_tx_at) once Rust gets it

Check warning on line 88 in benchmarks/transactions-generator/src/lib.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (slavas)
if let Some(delay) = next_tx_at.checked_duration_since(Instant::now()) {
thread::sleep(delay);
}
let block_hash = CryptoHash::default(); // todo(slavas): fix before merge

Check warning on line 92 in benchmarks/transactions-generator/src/lib.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (slavas)

let id_sender = Uniform::from(0..accounts.len()).sample(&mut rnd);
let id_recv = loop {
let candidate = Uniform::from(0..accounts.len()).sample(&mut rnd);
if candidate != id_sender {
break candidate;
}
};

let sender = &accounts[id_sender];
let receiver = &accounts[id_recv];
let transaction = SignedTransaction::send_money(
sender.nonce + 1,
sender.id.clone(),
receiver.id.clone(),
&sender.as_signer(),
AMOUNT,
block_hash.clone(),
);

client_sender.tx_request_sender.send(ProcessTxRequest{
transaction, is_forwarded: false, check_only: false});
next_tx_at += time_interval;
}
});

self.runner = Some((handle, stop_token));
Ok(())
}
}
4 changes: 3 additions & 1 deletion nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ near-primitives.workspace = true
near-rosetta-rpc = { workspace = true, optional = true }
near-store.workspace = true
near-telemetry.workspace = true
near-transactions-generator = { workspace = true, optional = true }
near-vm-runner.workspace = true
node-runtime.workspace = true
near-config-utils.workspace = true
Expand All @@ -86,7 +87,7 @@ name = "store"
harness = false

[features]
default = ["json_rpc", "rosetta_rpc"]
default = ["json_rpc", "rosetta_rpc", "tx_generator"] # todo(slavas): fix before merge. remove tx_generator from defaults

performance_stats = [
"near-performance-metrics/performance_stats",
Expand Down Expand Up @@ -172,3 +173,4 @@ sandbox = [
io_trace = ["near-vm-runner/io_trace"]

calimero_zero_storage = ["near-primitives/calimero_zero_storage"]
tx_generator = ["near-transactions-generator"]
8 changes: 8 additions & 0 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ pub struct Config {
#[cfg(feature = "rosetta_rpc")]
#[serde(skip_serializing_if = "Option::is_none")]
pub rosetta_rpc: Option<RosettaRpcConfig>,
#[cfg(feature = "tx_generator")]
pub tx_generator: Option<near_transactions_generator::TxGeneratorConfig>,
pub telemetry: TelemetryConfig,
pub network: near_network::config_json::Config,
pub consensus: Consensus,
Expand Down Expand Up @@ -355,6 +357,8 @@ impl Default for Config {
rpc: Some(RpcConfig::default()),
#[cfg(feature = "rosetta_rpc")]
rosetta_rpc: None,
#[cfg(feature = "tx_generator")]
tx_generator: None,
telemetry: TelemetryConfig::default(),
network: Default::default(),
consensus: Consensus::default(),
Expand Down Expand Up @@ -479,6 +483,8 @@ impl Config {
pub struct NearConfig {
pub config: Config,
pub client_config: ClientConfig,
#[cfg(feature = "tx_generator")]
pub tx_generator: Option<near_transactions_generator::TxGeneratorConfig>,
pub network_config: NetworkConfig,
#[cfg(feature = "json_rpc")]
pub rpc_config: Option<RpcConfig>,
Expand Down Expand Up @@ -573,6 +579,8 @@ impl NearConfig {
orphan_state_witness_max_size: config.orphan_state_witness_max_size,
save_latest_witnesses: config.save_latest_witnesses,
},
#[cfg(feature = "tx_generator")]
tx_generator: config.tx_generator,
network_config: NetworkConfig::new(
config.network,
network_key_pair.secret_key,
Expand Down
Loading

0 comments on commit 44ec8e2

Please sign in to comment.