Skip to content

Commit

Permalink
Things
Browse files Browse the repository at this point in the history
  • Loading branch information
Veykril committed Mar 2, 2025
1 parent 4d92253 commit 9e2a27a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 56 deletions.
4 changes: 2 additions & 2 deletions src/function/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where

// If we already executed this query once, then use the tracked-struct ids from the
// previous execution as the starting point for the new one.
if let Some(old_memo) = &opt_old_memo {
if let Some(old_memo) = opt_old_memo {
active_query.seed_tracked_struct_ids(&old_memo.revisions.tracked_struct_ids);
}

Expand Down Expand Up @@ -75,7 +75,7 @@ where
// really change, even if some of its inputs have. So we can
// "backdate" its `changed_at` revision to be the same as the
// old value.
if let Some(old_memo) = &opt_old_memo {
if let Some(old_memo) = opt_old_memo {
self.backdate_if_appropriate(old_memo, &mut revisions, &value);
self.diff_outputs(zalsa, db, database_key_index, old_memo, &mut revisions);
}
Expand Down
25 changes: 12 additions & 13 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
runtime::StampedValue,
zalsa::{Zalsa, ZalsaDatabase},
AsDynDatabase as _, Id,
Id,
};

impl<C> IngredientImpl<C>
Expand All @@ -16,11 +16,14 @@ where
zalsa.unwind_if_revision_cancelled(db);

let memo = self.refresh_memo(db, id);
// SAFETY: We just refreshed the memo so it is guaranteed to contain a value now.
let StampedValue {
value,
durability,
changed_at,
} = memo.revisions.stamped_value(memo.value.as_ref().unwrap());
} = memo
.revisions
.stamped_value(unsafe { memo.value.as_ref().unwrap_unchecked() });

self.lru.record_use(id);

Expand Down Expand Up @@ -64,7 +67,7 @@ where
memo_ingredient_index: MemoIngredientIndex,
) -> Option<&'db Memo<C::Output<'db>>> {
let memo_guard = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
if let Some(memo) = &memo_guard {
if let Some(memo) = memo_guard {
if memo.value.is_some()
&& self.shallow_verify_memo(db, zalsa, self.database_key_index(id), memo)
{
Expand All @@ -83,24 +86,20 @@ where
id: Id,
memo_ingredient_index: MemoIngredientIndex,
) -> Option<&'db Memo<C::Output<'db>>> {
let zalsa_local = db.zalsa_local();
let database_key_index = self.database_key_index(id);

// Try to claim this query: if someone else has claimed it already, go back and start again.
let _claim_guard = zalsa.sync_table_for(id).claim(
db.as_dyn_database(),
zalsa,
zalsa_local,
database_key_index,
memo_ingredient_index,
)?;
let _claim_guard =
zalsa
.sync_table_for(id)
.claim(db, zalsa, database_key_index, memo_ingredient_index)?;

// Push the query on the stack.
let active_query = zalsa_local.push_query(database_key_index);
let active_query = db.zalsa_local().push_query(database_key_index);

// Now that we've claimed the item, check again to see if there's a "hot" value.
let opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
if let Some(old_memo) = &opt_old_memo {
if let Some(old_memo) = opt_old_memo {
if old_memo.value.is_some() && self.deep_verify_memo(db, zalsa, old_memo, &active_query)
{
// Unsafety invariant: memo is present in memo_map and we have verified that it is
Expand Down
8 changes: 8 additions & 0 deletions src/function/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@ impl Lru {
}
}

#[inline(always)]
pub(super) fn record_use(&self, index: Id) {
if self.capacity.is_none() {
// LRU is disabled
return;
}
self.insert(index);
}

#[inline(never)]
fn insert(&self, index: Id) {
let mut set = self.set.lock();
set.insert(index);
}

pub(super) fn set_capacity(&mut self, capacity: usize) {
self.capacity = NonZeroUsize::new(capacity);
if self.capacity.is_none() {
self.set.get_mut().clear();
}
}

pub(super) fn for_each_evicted(&mut self, mut cb: impl FnMut(Id)) {
Expand Down
8 changes: 3 additions & 5 deletions src/function/maybe_changed_after.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where

// Check if we have a verified version: this is the hot path.
let memo_guard = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
if let Some(memo) = &memo_guard {
if let Some(memo) = memo_guard {
if self.shallow_verify_memo(db, zalsa, database_key_index, memo) {
return if memo.revisions.changed_at > revision {
MaybeChangedAfter::Yes
Expand Down Expand Up @@ -62,15 +62,13 @@ where
) -> Option<MaybeChangedAfter> {
let database_key_index = self.database_key_index(key_index);

let zalsa_local = db.zalsa_local();
let _claim_guard = zalsa.sync_table_for(key_index).claim(
db.as_dyn_database(),
db,
zalsa,
zalsa_local,
database_key_index,
memo_ingredient_index,
)?;
let active_query = zalsa_local.push_query(database_key_index);
let active_query = db.zalsa_local().push_query(database_key_index);

// Load the current memo, if any.
let Some(old_memo) = self.get_memo_from_table_for(zalsa, key_index, memo_ingredient_index)
Expand Down
59 changes: 23 additions & 36 deletions src/table/sync.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::{
sync::atomic::{AtomicBool, Ordering},
thread::ThreadId,
};
use std::thread::ThreadId;

use parking_lot::RwLock;
use parking_lot::Mutex;

use crate::{
key::DatabaseKeyIndex,
runtime::WaitResult,
zalsa::{MemoIngredientIndex, Zalsa},
zalsa_local::ZalsaLocal,
Database,
};

Expand All @@ -19,36 +15,35 @@ use super::util;
/// worker threads.
#[derive(Default)]
pub(crate) struct SyncTable {
syncs: RwLock<Vec<Option<SyncState>>>,
syncs: Mutex<Vec<Option<SyncState>>>,
}

struct SyncState {
id: ThreadId,

/// Set to true if any other queries are blocked,
/// waiting for this query to complete.
anyone_waiting: AtomicBool,
anyone_waiting: bool,
}

impl SyncTable {
pub(crate) fn claim<'me>(
&'me self,
db: &'me dyn Database,
db: &'me (impl ?Sized + Database),
zalsa: &'me Zalsa,
zalsa_local: &ZalsaLocal,
database_key_index: DatabaseKeyIndex,
memo_ingredient_index: MemoIngredientIndex,
) -> Option<ClaimGuard<'me>> {
let mut syncs = self.syncs.write();
let mut syncs = self.syncs.lock();
let thread_id = std::thread::current().id();

util::ensure_vec_len(&mut syncs, memo_ingredient_index.as_usize() + 1);

match &syncs[memo_ingredient_index.as_usize()] {
match &mut syncs[memo_ingredient_index.as_usize()] {
None => {
syncs[memo_ingredient_index.as_usize()] = Some(SyncState {
id: thread_id,
anyone_waiting: AtomicBool::new(false),
anyone_waiting: false,
});
Some(ClaimGuard {
database_key_index,
Expand All @@ -61,16 +56,10 @@ impl SyncTable {
id: other_id,
anyone_waiting,
}) => {
// NB: `Ordering::Relaxed` is sufficient here,
// as there are no loads that are "gated" on this
// value. Everything that is written is also protected
// by a lock that must be acquired. The role of this
// boolean is to decide *whether* to acquire the lock,
// not to gate future atomic reads.
anyone_waiting.store(true, Ordering::Relaxed);
*anyone_waiting = true;
zalsa.runtime().block_on_or_unwind(
db,
zalsa_local,
db.as_dyn_database(),
db.zalsa_local(),
database_key_index,
*other_id,
syncs,
Expand All @@ -92,30 +81,28 @@ pub(crate) struct ClaimGuard<'me> {
}

impl ClaimGuard<'_> {
fn remove_from_map_and_unblock_queries(&self, wait_result: WaitResult) {
let mut syncs = self.sync_table.syncs.write();
fn remove_from_map_and_unblock_queries(&self) {
let mut syncs = self.sync_table.syncs.lock();

let SyncState { anyone_waiting, .. } =
syncs[self.memo_ingredient_index.as_usize()].take().unwrap();

// NB: `Ordering::Relaxed` is sufficient here,
// see `store` above for explanation.
if anyone_waiting.load(Ordering::Relaxed) {
self.zalsa
.runtime()
.unblock_queries_blocked_on(self.database_key_index, wait_result)
if anyone_waiting {
self.zalsa.runtime().unblock_queries_blocked_on(
self.database_key_index,
if std::thread::panicking() {
WaitResult::Panicked
} else {
WaitResult::Completed
},
)
}
}
}

impl Drop for ClaimGuard<'_> {
fn drop(&mut self) {
let wait_result = if std::thread::panicking() {
WaitResult::Panicked
} else {
WaitResult::Completed
};
self.remove_from_map_and_unblock_queries(wait_result)
self.remove_from_map_and_unblock_queries()
}
}

Expand Down

0 comments on commit 9e2a27a

Please sign in to comment.