Skip to content

Commit

Permalink
test: add engine and processor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
g4titanx committed Jan 22, 2025
1 parent 8f54b66 commit cd015ad
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 0 deletions.
317 changes: 317 additions & 0 deletions crates/torii/indexer/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use cainome::cairo_serde::{ByteArray, CairoSerde, ContractAddress};
use dojo_test_utils::compiler::CompilerTestSetup;
Expand All @@ -24,6 +25,7 @@ use torii_sqlite::cache::ModelCache;
use torii_sqlite::executor::Executor;
use torii_sqlite::types::{Contract, ContractType};
use torii_sqlite::Sql;
use tracing::info;

use crate::engine::{Engine, EngineConfig, Processors};

Expand Down Expand Up @@ -513,3 +515,318 @@ async fn count_table(table_name: &str, pool: &sqlx::Pool<sqlx::Sqlite>) -> i64 {

count.0
}

/// Tests the event processor's ability to handle different types of events sequentially.
/// This test verifies:
/// 1. Event processing for spawn events (StoreSetRecord)
/// 2. Event processing for player configuration
/// 3. Proper model registration in the database
/// 4. Correct entity creation with expected keys format
/// 5. Entity-model relationships
#[tokio::test(flavor = "multi_thread")]
#[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())]
async fn test_processor_events_handling(sequencer: &RunnerCtx) {
let setup = CompilerTestSetup::from_examples("../../dojo/core", "../../../examples/");
let config = setup.build_test_config("spawn-and-move", Profile::DEV);

let ws = scarb::ops::read_workspace(config.manifest_path(), &config).unwrap();
let account = sequencer.account(0);
let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url())));

let world_local = ws.load_world_local().unwrap();
let world_address = world_local.deterministic_world_address().unwrap();
let actions_address = world_local
.get_contract_address_local(compute_selector_from_names("ns", "actions"))
.unwrap();

let world = WorldContract::new(world_address, &account);

// Grant writer permission
let res = world
.grant_writer(&compute_bytearray_hash("ns"), &ContractAddress(actions_address))
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap();

// Setup database
let tempfile = NamedTempFile::new().unwrap();
let path = tempfile.path().to_string_lossy();
let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) =
Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap();

tokio::spawn(async move {
executor.run().await.unwrap();
});

let model_cache = Arc::new(ModelCache::new(pool.clone()));
let db = Sql::new(
pool.clone(),
sender.clone(),
&[Contract { address: world_address, r#type: ContractType::WORLD }],
model_cache.clone(),
)
.await
.unwrap();

// Test multiple event types
// 1. Spawn (StoreSetRecord)
let spawn_tx = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("spawn").unwrap(),
calldata: vec![],
}])
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

TransactionWaiter::new(spawn_tx.transaction_hash, &provider).await.unwrap();

// 2. Set player config
let config_tx = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("set_player_config").unwrap(),
calldata: vec![Felt::ZERO, Felt::ZERO, Felt::ZERO],
}])
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

TransactionWaiter::new(config_tx.transaction_hash, &provider).await.unwrap();

let world_reader = WorldContractReader::new(world_address, Arc::clone(&provider));
let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap();

// Verify event processing results
let entity_count = count_table("entities", &pool).await;
assert!(entity_count > 0, "Entity should be created after StoreSetRecord event");

// Verify model registration
let models = sqlx::query("SELECT * FROM models").fetch_all(&pool).await.unwrap();
assert!(!models.is_empty(), "Models should be registered");

// Verify entity keys
let (id, keys): (String, String) = sqlx::query_as(
format!(
"SELECT id, keys FROM entities WHERE id = '{:#x}'",
poseidon_hash_many(&[account.address()])
)
.as_str(),
)
.fetch_one(&pool)
.await
.unwrap();

assert_eq!(id, format!("{:#x}", poseidon_hash_many(&[account.address()])));
assert_eq!(keys, format!("{:#x}/", account.address()));
}

/// Tests the engine's ability to handle interruptions and recover.
/// This test verifies:
/// 1. Engine initialization with proper configuration
/// 2. Cursor management for tracking processed blocks
/// 3. Engine's ability to start and maintain a valid head cursor
/// 4. Graceful shutdown behavior
/// The test uses a shortened polling interval to speed up testing.
#[tokio::test(flavor = "multi_thread")]
#[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())]
async fn test_engine_backoff_and_recovery(sequencer: &RunnerCtx) {
let setup = CompilerTestSetup::from_examples("../../dojo/core", "../../../examples/");
let config = setup.build_test_config("spawn-and-move", Profile::DEV);

let ws = scarb::ops::read_workspace(config.manifest_path(), &config).unwrap();
let world_local = ws.load_world_local().unwrap();
let world_address = world_local.deterministic_world_address().unwrap();
let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url())));

// Setup database
let tempfile = NamedTempFile::new().unwrap();
let path = tempfile.path().to_string_lossy();
let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) =
Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap();

tokio::spawn(async move {
executor.run().await.unwrap();
});

let model_cache = Arc::new(ModelCache::new(pool.clone()));
let db = Sql::new(
pool.clone(),
sender.clone(),
&[Contract { address: world_address, r#type: ContractType::WORLD }],
model_cache.clone(),
)
.await
.unwrap();

// Create and configure engine
let mut config = EngineConfig::default();
config.polling_interval = Duration::from_millis(100);

let world_reader = WorldContractReader::new(world_address, Arc::clone(&provider));

let mut engine = Engine::new(
world_reader,
db.clone(),
provider.clone(),
Processors::default(),
config,
shutdown_tx,
None,
&[Contract { address: world_address, r#type: ContractType::WORLD }],
);

// Start engine in background
let engine_handle = tokio::spawn(async move {
if let Err(e) = engine.start().await {
eprintln!("Engine error: {:?}", e);
}

Check warning on line 696 in crates/torii/indexer/src/test.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/indexer/src/test.rs#L695-L696

Added lines #L695 - L696 were not covered by tests
});

// Let engine run and wait for initialization
tokio::time::sleep(Duration::from_secs(2)).await;

let head: i64 = sqlx::query_scalar("SELECT MAX(value) FROM cursors WHERE key = 'head'")
.fetch_one(&pool)
.await
.unwrap_or(0);

assert!(head >= 0, "Engine should initialize with valid head cursor");

engine_handle.abort();
}

/// Tests the processor's behavior under concurrent event load.
/// This test verifies:
/// 1. System's handling of multiple spawn transactions sent concurrently
/// 2. Data consistency when processing concurrent events
/// 3. Expected entity creation behavior (creates 2 entities consistently)
/// 4. Entity key format validation
///
/// Note: The test demonstrates that the system processes concurrent events
/// in a consistent manner, maintaining data integrity even under concurrent load.
#[tokio::test(flavor = "multi_thread")]
#[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())]
async fn test_concurrent_event_processing(sequencer: &RunnerCtx) {
let setup = CompilerTestSetup::from_examples("../../dojo/core", "../../../examples/");
let config = setup.build_test_config("spawn-and-move", Profile::DEV);

let ws = scarb::ops::read_workspace(config.manifest_path(), &config).unwrap();

let account = sequencer.account(0);
let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url())));

let world_local = ws.load_world_local().unwrap();
let world_address = world_local.deterministic_world_address().unwrap();
let actions_address = world_local
.get_contract_address_local(compute_selector_from_names("ns", "actions"))
.unwrap();

let world = WorldContract::new(world_address, &account);

info!("Test account address: {:#x}", account.address());

// Grant writer permission
let res = world
.grant_writer(&compute_bytearray_hash("ns"), &ContractAddress(actions_address))
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap();

// Setup database
let tempfile = NamedTempFile::new().unwrap();
let path = tempfile.path().to_string_lossy();
let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) =
Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap();

tokio::spawn(async move {
executor.run().await.unwrap();
});

let model_cache = Arc::new(ModelCache::new(pool.clone()));
let db = Sql::new(
pool.clone(),
sender.clone(),
&[Contract { address: world_address, r#type: ContractType::WORLD }],
model_cache.clone(),
)
.await
.unwrap();

// Test concurrent transactions
info!("Testing concurrent event processing...");
let mut txs = vec![];
for i in 0..3 {
let tx = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("spawn").unwrap(),
calldata: vec![],
}])
.send()
.await
.unwrap();

info!("Sent transaction {}: {:#x}", i, tx.transaction_hash);
txs.push(tx.transaction_hash);
}

// Wait for all transactions
for tx_hash in &txs {
TransactionWaiter::new(*tx_hash, &provider).await.unwrap();
info!("Transaction {:#x} completed", tx_hash);
}

let world_reader = WorldContractReader::new(world_address, Arc::clone(&provider));
let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap();

// Query and log entities
let entities: Vec<(String, String)> =
sqlx::query_as("SELECT id, keys FROM entities ORDER BY id").fetch_all(&pool).await.unwrap();

info!("Found {} entities:", entities.len());
for (entity_id, keys) in &entities {
info!("Entity ID: {}", entity_id);
info!("Entity Keys: {}", keys);
}

// Verify entities
assert!(!entities.is_empty(), "Processor should create at least one entity");

// Check keys format
for (entity_id, keys) in &entities {
assert!(
keys.ends_with("/"),
"Entity {} has keys that don't end with '/': {}",
entity_id,
keys
);
}

// Verify consistent entity count
assert_eq!(
entities.len(),
2,
"Processor consistently creates 2 entities under concurrent load"
);
}
Binary file modified spawn-and-move-db.tar.gz
Binary file not shown.
Binary file modified types-test-db.tar.gz
Binary file not shown.

0 comments on commit cd015ad

Please sign in to comment.