From 7da774ddd8053a1665fb29a4bd8f2c1e28f27b63 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Sat, 15 Mar 2025 13:49:27 +0100 Subject: [PATCH 1/4] Use a `Vec` for `CycleHeads` The number of cycle participants is generally low so a hashset likely has more overhead compared to a simple vec --- src/cycle.rs | 62 ++++++++++++++++++++++++---------------------- src/zalsa_local.rs | 5 ++-- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/src/cycle.rs b/src/cycle.rs index b5a0554b5..cdd41b749 100644 --- a/src/cycle.rs +++ b/src/cycle.rs @@ -50,7 +50,6 @@ //! for each iteration of the outer cycle. use crate::key::DatabaseKeyIndex; -use rustc_hash::FxHashSet; /// The maximum number of times we'll fixpoint-iterate before panicking. /// @@ -91,7 +90,7 @@ pub enum CycleRecoveryStrategy { /// the cycle head(s) (can be plural in case of nested cycles) representing the cycles it is part /// of. This struct tracks these cycle heads. #[derive(Clone, Debug, Default)] -pub(crate) struct CycleHeads(Option>>); +pub(crate) struct CycleHeads(Option>>); impl CycleHeads { pub(crate) fn is_empty(&self) -> bool { @@ -105,15 +104,16 @@ impl CycleHeads { } pub(crate) fn remove(&mut self, value: &DatabaseKeyIndex) -> bool { - if let Some(cycle_heads) = self.0.as_mut() { - let found = cycle_heads.remove(value); - if found && cycle_heads.is_empty() { - self.0.take(); - } - found - } else { - false + let Some(cycle_heads) = &mut self.0 else { + return false; + }; + let found = cycle_heads.iter().position(|&head| head == *value); + let Some(found) = found else { return false }; + cycle_heads.swap_remove(found); + if cycle_heads.is_empty() { + self.0.take(); } + true } } @@ -121,36 +121,39 @@ impl std::iter::Extend for CycleHeads { fn extend>(&mut self, iter: T) { let mut iter = iter.into_iter(); if let Some(first) = iter.next() { - let heads = self.0.get_or_insert(Box::new(FxHashSet::default())); - heads.insert(first); - heads.extend(iter) + let heads = &mut **self.0.get_or_insert_with(|| Box::new(Vec::new())); + if !heads.contains(&first) { + heads.push(first); + } + for head in iter { + if !heads.contains(&head) { + heads.push(head); + } + } } } } -impl std::iter::IntoIterator for CycleHeads { +impl IntoIterator for CycleHeads { type Item = DatabaseKeyIndex; - type IntoIter = std::collections::hash_set::IntoIter; + type IntoIter = as IntoIterator>::IntoIter; fn into_iter(self) -> Self::IntoIter { self.0.map(|heads| *heads).unwrap_or_default().into_iter() } } -// This type can be removed once MSRV is 1.83+ and we have Default for hashset iterators. -pub(crate) struct CycleHeadsIter<'a>( - Option>, -); +pub(crate) struct CycleHeadsIter<'a>(std::slice::Iter<'a, DatabaseKeyIndex>); impl Iterator for CycleHeadsIter<'_> { type Item = DatabaseKeyIndex; fn next(&mut self) -> Option { - self.0.as_mut()?.next().copied() + self.0.next().copied() } fn last(self) -> Option { - self.0?.last().copied() + self.0.last().copied() } } @@ -161,17 +164,18 @@ impl<'a> std::iter::IntoIterator for &'a CycleHeads { type IntoIter = CycleHeadsIter<'a>; fn into_iter(self) -> Self::IntoIter { - CycleHeadsIter(self.0.as_ref().map(|heads| heads.iter())) + CycleHeadsIter( + self.0 + .as_ref() + .map(|heads| heads.iter()) + .unwrap_or_default(), + ) } } -impl From> for CycleHeads { - fn from(value: FxHashSet) -> Self { - Self(if value.is_empty() { - None - } else { - Some(Box::new(value)) - }) +impl From for CycleHeads { + fn from(value: DatabaseKeyIndex) -> Self { + Self(Some(Box::new(vec![value]))) } } diff --git a/src/zalsa_local.rs b/src/zalsa_local.rs index b9db8b641..bd96e3431 100644 --- a/src/zalsa_local.rs +++ b/src/zalsa_local.rs @@ -1,4 +1,4 @@ -use rustc_hash::{FxHashMap, FxHashSet}; +use rustc_hash::FxHashMap; use tracing::debug; use crate::accumulator::accumulated_map::{ @@ -324,7 +324,6 @@ pub(crate) struct QueryRevisions { impl QueryRevisions { pub(crate) fn fixpoint_initial(query: DatabaseKeyIndex, revision: Revision) -> Self { - let cycle_heads = FxHashSet::from_iter([query]).into(); Self { changed_at: revision, durability: Durability::MAX, @@ -332,7 +331,7 @@ impl QueryRevisions { tracked_struct_ids: Default::default(), accumulated: Default::default(), accumulated_inputs: Default::default(), - cycle_heads, + cycle_heads: CycleHeads::from(query), } } From efec0f940560afe06fb0e6b2a783bd06f3a8de84 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Sat, 15 Mar 2025 14:03:29 +0100 Subject: [PATCH 2/4] Use `CycleHeads` type in `VerifyResult` --- src/cycle.rs | 28 +++++++++++++++++++++++--- src/function/maybe_changed_after.rs | 31 ++++++++++++++++++----------- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/cycle.rs b/src/cycle.rs index cdd41b749..e20ea0f7f 100644 --- a/src/cycle.rs +++ b/src/cycle.rs @@ -90,7 +90,8 @@ pub enum CycleRecoveryStrategy { /// the cycle head(s) (can be plural in case of nested cycles) representing the cycles it is part /// of. This struct tracks these cycle heads. #[derive(Clone, Debug, Default)] -pub(crate) struct CycleHeads(Option>>); +#[allow(clippy::box_collection)] +pub struct CycleHeads(Option>>); impl CycleHeads { pub(crate) fn is_empty(&self) -> bool { @@ -115,6 +116,17 @@ impl CycleHeads { } true } + + #[inline] + pub(crate) fn insert_into(self, cycle_heads: &mut Vec) { + if let Some(heads) = self.0 { + for head in *heads { + if !cycle_heads.contains(&head) { + cycle_heads.push(head); + } + } + } + } } impl std::iter::Extend for CycleHeads { @@ -143,7 +155,7 @@ impl IntoIterator for CycleHeads { } } -pub(crate) struct CycleHeadsIter<'a>(std::slice::Iter<'a, DatabaseKeyIndex>); +pub struct CycleHeadsIter<'a>(std::slice::Iter<'a, DatabaseKeyIndex>); impl Iterator for CycleHeadsIter<'_> { type Item = DatabaseKeyIndex; @@ -179,4 +191,14 @@ impl From for CycleHeads { } } -pub(crate) static EMPTY_CYCLE_HEADS: CycleHeads = CycleHeads(None); +impl From> for CycleHeads { + fn from(value: Vec) -> Self { + Self(if value.is_empty() { + None + } else { + Some(Box::new(value)) + }) + } +} + +pub(crate) const EMPTY_CYCLE_HEADS: CycleHeads = CycleHeads(None); diff --git a/src/function/maybe_changed_after.rs b/src/function/maybe_changed_after.rs index a46343f4e..49e9dd185 100644 --- a/src/function/maybe_changed_after.rs +++ b/src/function/maybe_changed_after.rs @@ -1,13 +1,12 @@ use crate::{ accumulator::accumulated_map::InputAccumulatedValues, - cycle::CycleRecoveryStrategy, + cycle::{CycleHeads, CycleRecoveryStrategy}, key::DatabaseKeyIndex, table::sync::ClaimResult, zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase}, zalsa_local::{ActiveQueryGuard, QueryEdge, QueryOrigin}, AsDynDatabase as _, Id, Revision, }; -use rustc_hash::FxHashSet; use std::sync::atomic::Ordering; use super::{memo::Memo, Configuration, IngredientImpl}; @@ -22,9 +21,9 @@ pub enum VerifyResult { /// The first inner value tracks whether the memo or any of its dependencies have an /// accumulated value. /// - /// Database keys in the hashset represent cycle heads encountered in validation; don't mark + /// The second is the cycle heads encountered in validation; don't mark /// memos verified until we've iterated the full cycle to ensure no inputs changed. - Unchanged(InputAccumulatedValues, FxHashSet), + Unchanged(InputAccumulatedValues, CycleHeads), } impl VerifyResult { @@ -37,7 +36,7 @@ impl VerifyResult { } pub(crate) fn unchanged() -> Self { - Self::Unchanged(InputAccumulatedValues::Empty, FxHashSet::default()) + Self::Unchanged(InputAccumulatedValues::Empty, CycleHeads::default()) } } @@ -69,7 +68,7 @@ where } else { VerifyResult::Unchanged( memo.revisions.accumulated_inputs.load(), - FxHashSet::default(), + CycleHeads::default(), ) }; } @@ -112,7 +111,7 @@ where CycleRecoveryStrategy::Fixpoint => { return Some(VerifyResult::Unchanged( InputAccumulatedValues::Empty, - FxHashSet::from_iter([database_key_index]), + CycleHeads::from(database_key_index), )); } }, @@ -158,7 +157,7 @@ where Some(_) => InputAccumulatedValues::Any, None => memo.revisions.accumulated_inputs.load(), }, - FxHashSet::default(), + CycleHeads::default(), ) }); } @@ -282,7 +281,7 @@ where return VerifyResult::Changed; } - let mut cycle_heads = FxHashSet::default(); + let mut cycle_heads = vec![]; loop { let inputs = match &old_memo.revisions.origin { QueryOrigin::Assigned(_) => { @@ -324,7 +323,7 @@ where { VerifyResult::Changed => return VerifyResult::Changed, VerifyResult::Unchanged(input_accumulated, cycles) => { - cycle_heads.extend(cycles); + cycles.insert_into(&mut cycle_heads); inputs |= input_accumulated; } } @@ -384,7 +383,12 @@ where // from cycle heads. We will handle our own memo (and the rest of our cycle) on a // future iteration; first the outer cycle head needs to verify itself. - let in_heads = cycle_heads.remove(&database_key_index); + let found = cycle_heads + .iter() + .position(|&head| head == database_key_index); + let in_heads = found + .inspect(|&head| _ = cycle_heads.swap_remove(head)) + .is_some(); if cycle_heads.is_empty() { old_memo.mark_as_verified(db, zalsa.current_revision(), database_key_index, inputs); @@ -398,7 +402,10 @@ where continue; } } - return VerifyResult::Unchanged(InputAccumulatedValues::Empty, cycle_heads); + return VerifyResult::Unchanged( + InputAccumulatedValues::Empty, + CycleHeads::from(cycle_heads), + ); } } } From a13bcba5dd1d714f1f9c1ed31d825b89435d416b Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Sat, 15 Mar 2025 14:37:14 +0100 Subject: [PATCH 3/4] Simplify some `report_tracked_read` calls --- src/active_query.rs | 11 +++++++++++ src/function/maybe_changed_after.rs | 5 ++--- src/input.rs | 7 ++----- src/interned.rs | 6 +----- src/tracked_struct.rs | 11 +++-------- src/zalsa_local.rs | 18 ++++++++++++++++++ 6 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/active_query.rs b/src/active_query.rs index 9156d2e3f..b2f82b14c 100644 --- a/src/active_query.rs +++ b/src/active_query.rs @@ -94,6 +94,17 @@ impl ActiveQuery { self.cycle_heads.extend(cycle_heads); } + pub(super) fn add_read_simple( + &mut self, + input: InputDependencyIndex, + durability: Durability, + revision: Revision, + ) { + self.input_outputs.insert(QueryEdge::Input(input)); + self.durability = self.durability.min(durability); + self.changed_at = self.changed_at.max(revision); + } + pub(super) fn add_untracked_read(&mut self, changed_at: Revision) { self.untracked_read = true; self.durability = Durability::MIN; diff --git a/src/function/maybe_changed_after.rs b/src/function/maybe_changed_after.rs index 49e9dd185..4a322e7e5 100644 --- a/src/function/maybe_changed_after.rs +++ b/src/function/maybe_changed_after.rs @@ -383,10 +383,9 @@ where // from cycle heads. We will handle our own memo (and the rest of our cycle) on a // future iteration; first the outer cycle head needs to verify itself. - let found = cycle_heads + let in_heads = cycle_heads .iter() - .position(|&head| head == database_key_index); - let in_heads = found + .position(|&head| head == database_key_index) .inspect(|&head| _ = cycle_heads.swap_remove(head)) .is_some(); diff --git a/src/input.rs b/src/input.rs index 75d28b5d2..4eb6be1e6 100644 --- a/src/input.rs +++ b/src/input.rs @@ -11,8 +11,7 @@ pub mod singleton; use input_field::FieldIngredientImpl; use crate::{ - accumulator::accumulated_map::InputAccumulatedValues, - cycle::{CycleRecoveryStrategy, EMPTY_CYCLE_HEADS}, + cycle::CycleRecoveryStrategy, function::VerifyResult, id::{AsId, FromIdWithDb}, ingredient::{fmt_index, Ingredient}, @@ -178,12 +177,10 @@ impl IngredientImpl { let id = id.as_id(); let value = Self::data(zalsa, id); let stamp = &value.stamps[field_index]; - zalsa_local.report_tracked_read( + zalsa_local.report_tracked_read_simple( InputDependencyIndex::new(field_ingredient_index, id), stamp.durability, stamp.changed_at, - InputAccumulatedValues::Empty, - &EMPTY_CYCLE_HEADS, ); &value.fields } diff --git a/src/interned.rs b/src/interned.rs index 2e1ed07f3..e1087c1c9 100644 --- a/src/interned.rs +++ b/src/interned.rs @@ -1,7 +1,5 @@ use dashmap::SharedValue; -use crate::accumulator::accumulated_map::InputAccumulatedValues; -use crate::cycle::EMPTY_CYCLE_HEADS; use crate::durability::Durability; use crate::function::VerifyResult; use crate::ingredient::fmt_index; @@ -180,12 +178,10 @@ where C::Fields<'db>: HashEqLike, { let zalsa_local = db.zalsa_local(); - zalsa_local.report_tracked_read( + zalsa_local.report_tracked_read_simple( InputDependencyIndex::for_table(self.ingredient_index), Durability::MAX, self.reset_at, - InputAccumulatedValues::Empty, - &EMPTY_CYCLE_HEADS, ); // Optimization to only get read lock on the map if the data has already been interned. diff --git a/src/tracked_struct.rs b/src/tracked_struct.rs index a3b01dc1d..b5d94a07c 100644 --- a/src/tracked_struct.rs +++ b/src/tracked_struct.rs @@ -4,8 +4,7 @@ use crossbeam_queue::SegQueue; use tracked_field::FieldIngredientImpl; use crate::{ - accumulator::accumulated_map::InputAccumulatedValues, - cycle::{CycleRecoveryStrategy, EMPTY_CYCLE_HEADS}, + cycle::CycleRecoveryStrategy, function::VerifyResult, ingredient::{fmt_index, Ingredient, Jar}, key::{DatabaseKeyIndex, InputDependencyIndex}, @@ -677,12 +676,10 @@ where let field_changed_at = data.revisions[relative_tracked_index]; - zalsa_local.report_tracked_read( + zalsa_local.report_tracked_read_simple( InputDependencyIndex::new(field_ingredient_index, id), data.durability, field_changed_at, - InputAccumulatedValues::Empty, - &EMPTY_CYCLE_HEADS, ); unsafe { self.to_self_ref(&data.fields) } @@ -704,12 +701,10 @@ where data.read_lock(zalsa.current_revision()); // Add a dependency on the tracked struct itself. - zalsa_local.report_tracked_read( + zalsa_local.report_tracked_read_simple( InputDependencyIndex::new(self.ingredient_index, id), data.durability, data.created_at, - InputAccumulatedValues::Empty, - &EMPTY_CYCLE_HEADS, ); unsafe { self.to_self_ref(&data.fields) } diff --git a/src/zalsa_local.rs b/src/zalsa_local.rs index bd96e3431..1962f6622 100644 --- a/src/zalsa_local.rs +++ b/src/zalsa_local.rs @@ -183,6 +183,24 @@ impl ZalsaLocal { }) } + /// Register that currently active query reads the given input + pub(crate) fn report_tracked_read_simple( + &self, + input: InputDependencyIndex, + durability: Durability, + changed_at: Revision, + ) { + debug!( + "report_tracked_read(input={:?}, durability={:?}, changed_at={:?})", + input, durability, changed_at + ); + self.with_query_stack(|stack| { + if let Some(top_query) = stack.last_mut() { + top_query.add_read_simple(input, durability, changed_at); + } + }) + } + /// Register that the current query read an untracked value /// /// # Parameters From 5b718d7c8acc67aabca3b2020b1bd103ed1ba917 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Sat, 15 Mar 2025 18:08:39 +0100 Subject: [PATCH 4/4] Simplify `CycleHeads` ref iterator --- src/cycle.rs | 15 +++------ src/function/maybe_changed_after.rs | 9 +++--- src/function/memo.rs | 49 +++++++++++++++-------------- 3 files changed, 35 insertions(+), 38 deletions(-) diff --git a/src/cycle.rs b/src/cycle.rs index e20ea0f7f..f8634db30 100644 --- a/src/cycle.rs +++ b/src/cycle.rs @@ -127,21 +127,16 @@ impl CycleHeads { } } } -} -impl std::iter::Extend for CycleHeads { - fn extend>(&mut self, iter: T) { - let mut iter = iter.into_iter(); - if let Some(first) = iter.next() { + pub(crate) fn extend(&mut self, other: &Self) { + if let Some(other) = &other.0 { let heads = &mut **self.0.get_or_insert_with(|| Box::new(Vec::new())); - if !heads.contains(&first) { - heads.push(first); - } - for head in iter { + heads.reserve(other.len()); + other.iter().for_each(|&head| { if !heads.contains(&head) { heads.push(head); } - } + }); } } } diff --git a/src/function/maybe_changed_after.rs b/src/function/maybe_changed_after.rs index 4a322e7e5..201c20c15 100644 --- a/src/function/maybe_changed_after.rs +++ b/src/function/maybe_changed_after.rs @@ -239,13 +239,12 @@ where zalsa: &Zalsa, memo: &Memo>, ) -> bool { - for cycle_head in &memo.revisions.cycle_heads { - if zalsa + if (&memo.revisions.cycle_heads).into_iter().any(|cycle_head| { + zalsa .lookup_ingredient(cycle_head.ingredient_index) .is_provisional_cycle_head(db.as_dyn_database(), cycle_head.key_index) - { - return false; - } + }) { + return false; } // Relaxed is sufficient here because there are no other writes we need to ensure have // happened before marking this memo as verified-final. diff --git a/src/function/memo.rs b/src/function/memo.rs index 390e4d587..6a70979d9 100644 --- a/src/function/memo.rs +++ b/src/function/memo.rs @@ -177,29 +177,32 @@ impl Memo { database_key_index: DatabaseKeyIndex, ) -> bool { let mut retry = false; - for head in self.cycle_heads() { - if head == database_key_index { - continue; - } - let ingredient = db.zalsa().lookup_ingredient(head.ingredient_index); - if !ingredient.is_provisional_cycle_head(db, head.key_index) { - // This cycle is already finalized, so we don't need to wait on it; - // keep looping through cycle heads. - retry = true; - continue; - } - if ingredient.wait_for(db, head.key_index) { - // There's a new memo available for the cycle head; fetch our own - // updated memo and see if it's still provisional or if the cycle - // has resolved. - retry = true; - continue; - } else { - // We hit a cycle blocking on the cycle head; this means it's in - // our own active query stack and we are responsible to resolve the - // cycle, so go ahead and return the provisional memo. - return false; - } + let hit_cycle = self + .cycle_heads() + .into_iter() + .filter(|&head| head != database_key_index) + .any(|head| { + let ingredient = db.zalsa().lookup_ingredient(head.ingredient_index); + if !ingredient.is_provisional_cycle_head(db, head.key_index) { + // This cycle is already finalized, so we don't need to wait on it; + // keep looping through cycle heads. + retry = true; + false + } else if ingredient.wait_for(db, head.key_index) { + // There's a new memo available for the cycle head; fetch our own + // updated memo and see if it's still provisional or if the cycle + // has resolved. + retry = true; + false + } else { + // We hit a cycle blocking on the cycle head; this means it's in + // our own active query stack and we are responsible to resolve the + // cycle, so go ahead and return the provisional memo. + true + } + }); + if hit_cycle { + return false; } // If `retry` is `true`, all our cycle heads (barring ourself) are complete; re-fetch // and we should get a non-provisional memo. If we get here and `retry` is still