Skip to content

Commit

Permalink
add test documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
g4titanx committed Jan 16, 2025
1 parent f32729c commit 962bf01
Showing 1 changed file with 138 additions and 114 deletions.
252 changes: 138 additions & 114 deletions crates/torii/indexer/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use torii_sqlite::cache::ModelCache;
use torii_sqlite::executor::Executor;
use torii_sqlite::types::{Contract, ContractType};
use torii_sqlite::Sql;
use starknet::core::types::TransactionReceipt;

use crate::engine::{Engine, EngineConfig, Processors};

Expand Down Expand Up @@ -517,6 +516,13 @@ async fn count_table(table_name: &str, pool: &sqlx::Pool<sqlx::Sqlite>) -> i64 {
count.0
}

/// Tests the event processor's ability to handle different types of events sequentially.
/// This test verifies:
/// 1. Event processing for spawn events (StoreSetRecord)
/// 2. Event processing for player configuration
/// 3. Proper model registration in the database
/// 4. Correct entity creation with expected keys format
/// 5. Entity-model relationships
#[tokio::test(flavor = "multi_thread")]
#[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())]
async fn test_processor_events_handling(sequencer: &RunnerCtx) {
Expand Down Expand Up @@ -623,6 +629,13 @@ async fn test_processor_events_handling(sequencer: &RunnerCtx) {
assert_eq!(keys, format!("{:#x}/", account.address()));
}

/// Tests the engine's ability to handle interruptions and recover.
/// This test verifies:
/// 1. Engine initialization with proper configuration
/// 2. Cursor management for tracking processed blocks
/// 3. Engine's ability to start and maintain a valid head cursor
/// 4. Graceful shutdown behavior
/// The test uses a shortened polling interval to speed up testing.
#[tokio::test(flavor = "multi_thread")]
#[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())]
async fn test_engine_backoff_and_recovery(sequencer: &RunnerCtx) {
Expand Down Expand Up @@ -696,120 +709,131 @@ async fn test_engine_backoff_and_recovery(sequencer: &RunnerCtx) {
engine_handle.abort();
}

/// Tests the processor's behavior under concurrent event load.
/// This test verifies:
/// 1. System's handling of multiple spawn transactions sent concurrently
/// 2. Data consistency when processing concurrent events
/// 3. Expected entity creation behavior (creates 2 entities consistently)
/// 4. Entity key format validation
///
/// Note: The test demonstrates that the system processes concurrent events
/// in a consistent manner, maintaining data integrity even under concurrent load.
#[tokio::test(flavor = "multi_thread")]
#[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())]
async fn test_concurrent_event_processing(sequencer: &RunnerCtx) {
let setup = CompilerTestSetup::from_examples("../../dojo/core", "../../../examples/");
let config = setup.build_test_config("spawn-and-move", Profile::DEV);

let ws = scarb::ops::read_workspace(config.manifest_path(), &config).unwrap();

// Use the same account for everything like test_load_from_remote
let account = sequencer.account(0);
let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url())));

let world_local = ws.load_world_local().unwrap();
let world_address = world_local.deterministic_world_address().unwrap();
let actions_address = world_local
.get_contract_address_local(compute_selector_from_names("ns", "actions"))
.unwrap();

let world = WorldContract::new(world_address, &account);

info!("Account address: {:#x}", account.address());

// Grant writer permission
let res = world
.grant_writer(&compute_bytearray_hash("ns"), &ContractAddress(actions_address))
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap();

// Store expected entity ID
let expected_entity_id = format!("{:#x}", poseidon_hash_many(&[account.address()]));
info!("Expected entity ID: {}", expected_entity_id);

// Send three spawn transactions
let mut txs = vec![];
for i in 0..3 {
info!("Sending spawn transaction {}", i);
let tx = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("spawn").unwrap(),
calldata: vec![],
}])
.send()
.await
.unwrap();

TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap();
info!("Spawn transaction {} completed: {:#x}", i, tx.transaction_hash);
txs.push(tx.transaction_hash);
}

// Setup database and engine
let tempfile = NamedTempFile::new().unwrap();
let path = tempfile.path().to_string_lossy();
let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) =
Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap();

tokio::spawn(async move {
executor.run().await.unwrap();
});

let model_cache = Arc::new(ModelCache::new(pool.clone()));
let db = Sql::new(
pool.clone(),
sender.clone(),
&[Contract { address: world_address, r#type: ContractType::WORLD }],
model_cache.clone(),
)
.await
.unwrap();

let world_reader = WorldContractReader::new(world_address, Arc::clone(&provider));
let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap();

let entity_ids: Vec<String> = sqlx::query_scalar("SELECT id FROM entities ORDER BY id")
.fetch_all(&pool)
.await
.unwrap();

info!("Found {} entities", entity_ids.len());
for id in &entity_ids {
info!("Found entity: {}", id);
}

assert_eq!(
entity_ids.len(),
3,
"Expected 3 entities but found {}.\nEntity IDs: {:?}\nExpected ID: {}\nTransactions: {:?}",
entity_ids.len(),
entity_ids,
expected_entity_id,
txs.iter().map(|tx| format!("{:#x}", tx)).collect::<Vec<_>>()
);

// Verify the entity exists
let (id, keys): (String, String) = sqlx::query_as(
format!(
"SELECT id, keys FROM entities WHERE id = '{}'",
expected_entity_id
)
.as_str(),
)
.fetch_one(&pool)
.await
.unwrap();

assert_eq!(id, expected_entity_id);
assert_eq!(keys, format!("{:#x}/", account.address()));
let setup = CompilerTestSetup::from_examples("../../dojo/core", "../../../examples/");
let config = setup.build_test_config("spawn-and-move", Profile::DEV);

let ws = scarb::ops::read_workspace(config.manifest_path(), &config).unwrap();

let account = sequencer.account(0);
let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url())));

let world_local = ws.load_world_local().unwrap();
let world_address = world_local.deterministic_world_address().unwrap();
let actions_address = world_local
.get_contract_address_local(compute_selector_from_names("ns", "actions"))
.unwrap();

let world = WorldContract::new(world_address, &account);

info!("Test account address: {:#x}", account.address());

// Grant writer permission
let res = world
.grant_writer(&compute_bytearray_hash("ns"), &ContractAddress(actions_address))
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap();

// Setup database
let tempfile = NamedTempFile::new().unwrap();
let path = tempfile.path().to_string_lossy();
let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) =
Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap();

tokio::spawn(async move {
executor.run().await.unwrap();
});

let model_cache = Arc::new(ModelCache::new(pool.clone()));
let db = Sql::new(
pool.clone(),
sender.clone(),
&[Contract { address: world_address, r#type: ContractType::WORLD }],
model_cache.clone(),
)
.await
.unwrap();

// Test concurrent transactions
info!("Testing concurrent event processing...");
let mut txs = vec![];
for i in 0..3 {
let tx = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("spawn").unwrap(),
calldata: vec![],
}])
.send()
.await
.unwrap();

info!("Sent transaction {}: {:#x}", i, tx.transaction_hash);
txs.push(tx.transaction_hash);
}

// Wait for all transactions
for tx_hash in &txs {
TransactionWaiter::new(*tx_hash, &provider).await.unwrap();
info!("Transaction {:#x} completed", tx_hash);
}

let world_reader = WorldContractReader::new(world_address, Arc::clone(&provider));
let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap();

// Query and log entities
let entities: Vec<(String, String)> = sqlx::query_as(
"SELECT id, keys FROM entities ORDER BY id"
)
.fetch_all(&pool)
.await
.unwrap();

info!("Found {} entities:", entities.len());
for (entity_id, keys) in &entities {
info!("Entity ID: {}", entity_id);
info!("Entity Keys: {}", keys);
}

// Verify entities
assert!(
!entities.is_empty(),
"Processor should create at least one entity"
);

// Check keys format
for (entity_id, keys) in &entities {
assert!(
keys.ends_with("/"),
"Entity {} has keys that don't end with '/': {}",
entity_id,
keys
);
}

// Verify consistent entity count
assert_eq!(
entities.len(),
2,
"Processor consistently creates 2 entities under concurrent load"
);
}

0 comments on commit 962bf01

Please sign in to comment.