Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii-indexer): single fetch range but chunk block processing #2899

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 110 additions & 88 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ pub struct ParallelizedEvent {
pub event: Event,
}

#[derive(Debug)]
struct ContractRange {
from: u64,
to: u64,
}

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
Expand Down Expand Up @@ -321,116 +327,132 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}
}

// TODO: since we now process blocks in chunks we can parallelize the fetching of data
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block = self.provider.block_hash_and_number().await?;

let from = cursors.head.unwrap_or(self.config.world_block);
let total_remaining_blocks = latest_block.block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;

let instant = Instant::now();
let result = if from < latest_block.block_number {
let from = if from == 0 { from } else { from + 1 };
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");
FetchDataResult::Range(data)
// If we're not caught up with the latest block
if cursors.head.unwrap_or(self.config.world_block) < latest_block.block_number {
let instant = Instant::now();

// Get contract event ranges
let contract_ranges =
self.get_contract_ranges(cursors, latest_block.block_number).await?;

if contract_ranges.is_empty() {
return Ok(FetchDataResult::None);
}

let data = self.fetch_range(contract_ranges).await?;

debug!(
target: LOG_TARGET,
duration = ?instant.elapsed(),
"Fetched data for custom ranges."
);

Ok(FetchDataResult::Range(data))
} else if self.config.flags.contains(IndexingFlags::PENDING_BLOCKS) {
let data =
self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data.");
if let Some(data) = data {

Ok(if let Some(data) = data {
FetchDataResult::Pending(data)
} else {
FetchDataResult::None
}
})
} else {
FetchDataResult::None
};
Ok(FetchDataResult::None)
}
}

async fn get_contract_ranges(
&self,
cursors: &Cursors,
latest_block: u64,
) -> Result<HashMap<Felt, ContractRange>> {
let mut ranges = HashMap::new();
let base_from = cursors.head.unwrap_or(self.config.world_block);

for (contract_address, _) in self.contracts.iter() {
// First fetch a single chunk to find the first event block
let first_event_block =
self.get_first_event_block(base_from, latest_block, *contract_address).await?;

if let Some(first_block) = first_event_block {
let from = if first_block == 0 { first_block } else { first_block + 1 };
let total_remaining = latest_block - from;
let blocks_to_process = total_remaining.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;

ranges.insert(*contract_address, ContractRange { from, to });
}
}

Ok(result)
Ok(ranges)
}

pub async fn fetch_range(
&mut self,
async fn get_first_event_block(
&self,
from: u64,
to: u64,
cursor_map: &HashMap<Felt, Felt>,
) -> Result<FetchRangeResult> {
// Process all blocks from current to latest.
let mut fetch_all_events_tasks = VecDeque::new();

for contract in self.contracts.iter() {
let events_filter = EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(*contract.0),
keys: None,
};
let token_events_pages =
get_all_events(&self.provider, events_filter, self.config.events_chunk_size);

// Prefer processing world events first
match contract.1 {
ContractType::WORLD => fetch_all_events_tasks.push_front(token_events_pages),
_ => fetch_all_events_tasks.push_back(token_events_pages),
}
}
contract_address: Felt,
) -> Result<Option<u64>> {
let events_filter = EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(contract_address),
keys: None,
};

let task_result = join_all(fetch_all_events_tasks).await;
// Only fetch 1 event to find the first event block
let events_page = self.provider.get_events(events_filter, None, 1).await?;

let mut events = vec![];
Ok(events_page.events.first().and_then(|e| e.block_number.map(|b| std::cmp::max(b - 1, 0))))
}

for result in task_result {
let result = result?;
let contract_address =
result.0.expect("EventFilters that we use always have an address");
let events_pages = result.1;
let last_contract_tx = cursor_map.get(&contract_address).cloned();
let mut last_contract_tx_tmp = last_contract_tx;

debug!(target: LOG_TARGET, "Total events pages fetched for contract ({:#x}): {}", &contract_address, &events_pages.len());

for events_page in events_pages {
debug!("Processing events page with events: {}", &events_page.events.len());
for event in events_page.events {
// Then we skip all transactions until we reach the last pending processed
// transaction (if any)
if let Some(last_contract_tx) = last_contract_tx_tmp {
if event.transaction_hash != last_contract_tx {
continue;
}
async fn fetch_range(
&mut self,
contract_ranges: HashMap<Felt, ContractRange>,
) -> Result<FetchRangeResult> {
let mut fetch_all_events_tasks = VecDeque::new();

last_contract_tx_tmp = None;
}
for (contract_address, range) in contract_ranges.iter() {
if let Some(contract_type) = self.contracts.get(contract_address) {
let events_filter = EventFilter {
from_block: Some(BlockId::Number(range.from)),
to_block: Some(BlockId::Number(range.to)),
address: Some(*contract_address),
keys: None,
};

// Skip the latest pending block transaction events
// * as we might have multiple events for the same transaction
if let Some(last_contract_tx) = last_contract_tx {
if event.transaction_hash == last_contract_tx {
continue;
}
}
let token_events_pages =
get_all_events(&self.provider, events_filter, self.config.events_chunk_size);

events.push(event);
// Prioritize world events
match contract_type {
ContractType::WORLD => fetch_all_events_tasks.push_front(token_events_pages),
_ => fetch_all_events_tasks.push_back(token_events_pages),
}
}
}

// Transactions & blocks to process
let mut blocks = BTreeMap::new();
let task_results = join_all(fetch_all_events_tasks).await;

// Flatten events pages and events according to the pending block cursor
// to array of (block_number, transaction_hash)
// Process results and build return structures
let mut events = vec![];
let mut blocks = BTreeMap::new();
let mut transactions = LinkedHashMap::new();

for result in task_results {
let (_, events_pages) = result?;
events.extend(events_pages.into_iter().flat_map(|e| e.events));
}

// Process events into transactions and blocks
let mut block_set = HashSet::new();
for event in events {
let block_number = match event.block_number {
Some(block_number) => block_number,
None => unreachable!("In fetch range all events should have block number"),
};
let block_number =
event.block_number.expect("In fetch range all events should have block number");

block_set.insert(block_number);

Expand All @@ -440,6 +462,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
.push(event);
}

// Fetch block timestamps concurrently
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));
let mut set: JoinSet<Result<(u64, u64), anyhow::Error>> = JoinSet::new();

Expand All @@ -448,7 +471,6 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let provider = self.provider.clone();
set.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
debug!("Fetching block timestamp for block number: {}", block_number);
let block_timestamp = get_block_timestamp(&provider, block_number).await?;
Ok((block_number, block_timestamp))
});
Expand All @@ -459,10 +481,9 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
blocks.insert(block_number, block_timestamp);
}

debug!("Transactions: {}", &transactions.len());
debug!("Blocks: {}", &blocks.len());
let latest_block_number = contract_ranges.values().map(|r| r.to).max().unwrap_or(0);

Ok(FetchRangeResult { transactions, blocks, latest_block_number: to })
Ok(FetchRangeResult { transactions, blocks, latest_block_number })
}

async fn fetch_pending(
Expand Down Expand Up @@ -494,7 +515,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}))
}

pub async fn process(&mut self, fetch_result: FetchDataResult) -> Result<()> {
async fn process(&mut self, fetch_result: FetchDataResult) -> Result<()> {
match fetch_result {
FetchDataResult::Range(data) => self.process_range(data).await?,
FetchDataResult::Pending(data) => self.process_pending(data).await?,
Expand All @@ -504,7 +525,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
Ok(())
}

pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> {
async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> {
// Skip transactions that have been processed already
// Our cursor is the last processed transaction

Expand Down Expand Up @@ -550,10 +571,11 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
Ok(())
}

pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> {
async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> {
// Process all transactions
let mut processed_blocks = HashSet::new();
let mut cursor_map = HashMap::new();

for ((block_number, transaction_hash), events) in data.transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
Expand Down
Loading