Skip to content

Commit

Permalink
feat(torii): erc options for max tasks & artifacts path (#3061)
Browse files Browse the repository at this point in the history
* feat(torii): erc options for max tasks & artifacts path

* fmt

* fix naming

* fmt

* scope erc metadat tasks semaphore
  • Loading branch information
Larkooo authored Feb 26, 2025
1 parent 48cd046 commit 21b1c79
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 15 deletions.
14 changes: 9 additions & 5 deletions crates/torii/cli/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::path::PathBuf;

use anyhow::Result;
use camino::Utf8PathBuf;
use clap::Parser;
use dojo_utils::parse::parse_url;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -43,16 +42,15 @@ pub struct ToriiArgs {
#[arg(long, help = "Configuration file to setup Torii.")]
pub config: Option<PathBuf>,

/// Path to a directory to store ERC artifacts
#[arg(long)]
pub artifacts_path: Option<Utf8PathBuf>,

#[command(flatten)]
pub indexing: IndexingOptions,

#[command(flatten)]
pub events: EventsOptions,

#[command(flatten)]
pub erc: ErcOptions,

#[cfg(feature = "server")]
#[command(flatten)]
pub metrics: MetricsOptions,
Expand Down Expand Up @@ -105,6 +103,10 @@ impl ToriiArgs {
self.events = config.events.unwrap_or_default();
}

if self.erc == ErcOptions::default() {
self.erc = config.erc.unwrap_or_default();
}

#[cfg(feature = "server")]
{
if self.server == ServerOptions::default() {
Expand Down Expand Up @@ -133,6 +135,7 @@ pub struct ToriiArgsConfig {
pub explorer: Option<bool>,
pub indexing: Option<IndexingOptions>,
pub events: Option<EventsOptions>,
pub erc: Option<ErcOptions>,
#[cfg(feature = "server")]
pub metrics: Option<MetricsOptions>,
#[cfg(feature = "server")]
Expand Down Expand Up @@ -163,6 +166,7 @@ impl TryFrom<ToriiArgs> for ToriiArgsConfig {
if args.indexing == IndexingOptions::default() { None } else { Some(args.indexing) };
config.events =
if args.events == EventsOptions::default() { None } else { Some(args.events) };
config.erc = if args.erc == ErcOptions::default() { None } else { Some(args.erc) };

#[cfg(feature = "server")]
{
Expand Down
36 changes: 33 additions & 3 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;

use anyhow::Context;
use camino::Utf8PathBuf;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use starknet::core::types::Felt;
Expand All @@ -15,11 +16,12 @@ pub const DEFAULT_EVENTS_CHUNK_SIZE: u64 = 1024;
pub const DEFAULT_BLOCKS_CHUNK_SIZE: u64 = 10240;
pub const DEFAULT_POLLING_INTERVAL: u64 = 500;
pub const DEFAULT_MAX_CONCURRENT_TASKS: usize = 100;

pub const DEFAULT_RELAY_PORT: u16 = 9090;
pub const DEFAULT_RELAY_WEBRTC_PORT: u16 = 9091;
pub const DEFAULT_RELAY_WEBSOCKET_PORT: u16 = 9092;

pub const DEFAULT_ERC_MAX_METADATA_TASKS: usize = 10;

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[command(next_help_heading = "Relay options")]
pub struct RelayOptions {
Expand Down Expand Up @@ -118,11 +120,11 @@ pub struct IndexingOptions {
#[serde(default = "default_polling_interval")]
pub polling_interval: u64,

/// Max concurrent tasks
/// Maximum number of concurrent tasks used for processing parallelizable events.
#[arg(
long = "indexing.max_concurrent_tasks",
default_value_t = DEFAULT_MAX_CONCURRENT_TASKS,
help = "Max concurrent tasks used to parallelize indexing."
help = "Maximum number of concurrent tasks processing parallelizable events."
)]
#[serde(default = "default_max_concurrent_tasks")]
pub max_concurrent_tasks: usize,
Expand Down Expand Up @@ -346,6 +348,30 @@ impl Default for MetricsOptions {
}
}

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[command(next_help_heading = "ERC options")]
pub struct ErcOptions {
/// The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token
/// metadata.
#[arg(
long = "erc.max_metadata_tasks",
default_value_t = DEFAULT_ERC_MAX_METADATA_TASKS,
help = "The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token metadata."
)]
#[serde(default = "default_erc_max_metadata_tasks")]
pub max_metadata_tasks: usize,

/// Path to a directory to store ERC artifacts
#[arg(long)]
pub artifacts_path: Option<Utf8PathBuf>,
}

impl Default for ErcOptions {
fn default() -> Self {
Self { max_metadata_tasks: DEFAULT_ERC_MAX_METADATA_TASKS, artifacts_path: None }
}
}

// Parses clap cli argument which is expected to be in the format:
// - erc_type:address:start_block
// - address:start_block (erc_type defaults to ERC20)
Expand Down Expand Up @@ -433,3 +459,7 @@ fn default_relay_webrtc_port() -> u16 {
fn default_relay_websocket_port() -> u16 {
DEFAULT_RELAY_WEBSOCKET_PORT
}

fn default_erc_max_metadata_tasks() -> usize {
DEFAULT_ERC_MAX_METADATA_TASKS
}
3 changes: 2 additions & 1 deletion crates/torii/runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl Runner {
pool.clone(),
shutdown_tx.clone(),
provider.clone(),
self.args.indexing.max_concurrent_tasks,
self.args.erc.max_metadata_tasks,
)
.await?;
let executor_handle = tokio::spawn(async move { executor.run().await });
Expand Down Expand Up @@ -208,6 +208,7 @@ impl Runner {
let temp_dir = TempDir::new()?;
let artifacts_path = self
.args
.erc
.artifacts_path
.unwrap_or_else(|| Utf8PathBuf::from(temp_dir.path().to_str().unwrap()));

Expand Down
12 changes: 6 additions & 6 deletions crates/torii/sqlite/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub struct Executor<'c, P: Provider + Sync + Send + 'static> {
// It is used to make RPC calls to fetch token_uri data for erc721 contracts
provider: Arc<P>,
// Used to limit number of tasks that run in parallel to fetch metadata
semaphore: Arc<Semaphore>,
metadata_semaphore: Arc<Semaphore>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -234,13 +234,13 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
pool: Pool<Sqlite>,
shutdown_tx: Sender<()>,
provider: Arc<P>,
max_concurrent_tasks: usize,
max_metadata_tasks: usize,
) -> Result<(Self, UnboundedSender<QueryMessage>)> {
let (tx, rx) = unbounded_channel();
let transaction = pool.begin().await?;
let publish_queue = Vec::new();
let shutdown_rx = shutdown_tx.subscribe();
let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks));
let metadata_semaphore = Arc::new(Semaphore::new(max_metadata_tasks));

Ok((
Executor {
Expand All @@ -252,7 +252,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
register_tasks: JoinSet::new(),
deferred_query_messages: Vec::new(),
provider,
semaphore,
metadata_semaphore,
},
tx,
))
Expand Down Expand Up @@ -603,7 +603,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff.");
}
QueryType::RegisterNftToken(register_nft_token) => {
let semaphore = self.semaphore.clone();
let metadata_semaphore = self.metadata_semaphore.clone();
let provider = self.provider.clone();

let res = sqlx::query_as::<_, (String, String)>(&format!(
Expand Down Expand Up @@ -675,7 +675,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
};

self.register_tasks.spawn(async move {
let permit = semaphore.acquire().await.unwrap();
let permit = metadata_semaphore.acquire().await.unwrap();

let result = Self::process_register_nft_token_query(
register_nft_token,
Expand Down

0 comments on commit 21b1c79

Please sign in to comment.