Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/active_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ impl ActiveQuery {
self.cycle_heads.extend(cycle_heads);
}

pub(super) fn add_read_simple(
&mut self,
input: InputDependencyIndex,
durability: Durability,
revision: Revision,
) {
self.input_outputs.insert(QueryEdge::Input(input));
self.durability = self.durability.min(durability);
self.changed_at = self.changed_at.max(revision);
}

pub(super) fn add_untracked_read(&mut self, changed_at: Revision) {
self.untracked_read = true;
self.durability = Durability::MIN;
Expand Down
79 changes: 50 additions & 29 deletions src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
//! for each iteration of the outer cycle.

use crate::key::DatabaseKeyIndex;
use rustc_hash::FxHashSet;

/// The maximum number of times we'll fixpoint-iterate before panicking.
///
Expand Down Expand Up @@ -91,7 +90,8 @@ pub enum CycleRecoveryStrategy {
/// the cycle head(s) (can be plural in case of nested cycles) representing the cycles it is part
/// of. This struct tracks these cycle heads.
#[derive(Clone, Debug, Default)]
pub(crate) struct CycleHeads(Option<Box<FxHashSet<DatabaseKeyIndex>>>);
#[allow(clippy::box_collection)]
pub struct CycleHeads(Option<Box<Vec<DatabaseKeyIndex>>>);

impl CycleHeads {
pub(crate) fn is_empty(&self) -> bool {
Expand All @@ -105,52 +105,62 @@ impl CycleHeads {
}

pub(crate) fn remove(&mut self, value: &DatabaseKeyIndex) -> bool {
if let Some(cycle_heads) = self.0.as_mut() {
let found = cycle_heads.remove(value);
if found && cycle_heads.is_empty() {
self.0.take();
let Some(cycle_heads) = &mut self.0 else {
return false;
};
let found = cycle_heads.iter().position(|&head| head == *value);
let Some(found) = found else { return false };
cycle_heads.swap_remove(found);
if cycle_heads.is_empty() {
self.0.take();
}
true
}

#[inline]
pub(crate) fn insert_into(self, cycle_heads: &mut Vec<DatabaseKeyIndex>) {
if let Some(heads) = self.0 {
for head in *heads {
if !cycle_heads.contains(&head) {
cycle_heads.push(head);
}
}
found
} else {
false
}
}
}

impl std::iter::Extend<DatabaseKeyIndex> for CycleHeads {
fn extend<T: IntoIterator<Item = DatabaseKeyIndex>>(&mut self, iter: T) {
let mut iter = iter.into_iter();
if let Some(first) = iter.next() {
let heads = self.0.get_or_insert(Box::new(FxHashSet::default()));
heads.insert(first);
heads.extend(iter)
pub(crate) fn extend(&mut self, other: &Self) {
if let Some(other) = &other.0 {
let heads = &mut **self.0.get_or_insert_with(|| Box::new(Vec::new()));
heads.reserve(other.len());
other.iter().for_each(|&head| {
if !heads.contains(&head) {
heads.push(head);
}
});
}
}
}

impl std::iter::IntoIterator for CycleHeads {
impl IntoIterator for CycleHeads {
type Item = DatabaseKeyIndex;
type IntoIter = std::collections::hash_set::IntoIter<Self::Item>;
type IntoIter = <Vec<Self::Item> as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.0.map(|heads| *heads).unwrap_or_default().into_iter()
}
}

// This type can be removed once MSRV is 1.83+ and we have Default for hashset iterators.
pub(crate) struct CycleHeadsIter<'a>(
Option<std::collections::hash_set::Iter<'a, DatabaseKeyIndex>>,
);
pub struct CycleHeadsIter<'a>(std::slice::Iter<'a, DatabaseKeyIndex>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if you consider this an improvement, but we can eliminate CycleHeadsIter and implement IntoIterator for &'a CycleHeads like so:

impl<'a> std::iter::IntoIterator for &'a CycleHeads {
    type Item = DatabaseKeyIndex;
    type IntoIter = std::iter::Copied<std::slice::Iter<'a, DatabaseKeyIndex>>;

    fn into_iter(self) -> Self::IntoIter {
        self.0
            .as_ref()
            .map(|heads| heads.iter())
            .unwrap_or_default()
            .copied()
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes new[InternedInput] to regress due to a report_tracked_read becoming more expensive, so something optimizes differently there 😕 Will drop our Extend impl as we can optimize around knowing the concrete iterator anyways.


impl Iterator for CycleHeadsIter<'_> {
type Item = DatabaseKeyIndex;

fn next(&mut self) -> Option<Self::Item> {
self.0.as_mut()?.next().copied()
self.0.next().copied()
}

fn last(self) -> Option<Self::Item> {
self.0?.last().copied()
self.0.last().copied()
}
}

Expand All @@ -161,12 +171,23 @@ impl<'a> std::iter::IntoIterator for &'a CycleHeads {
type IntoIter = CycleHeadsIter<'a>;

fn into_iter(self) -> Self::IntoIter {
CycleHeadsIter(self.0.as_ref().map(|heads| heads.iter()))
CycleHeadsIter(
self.0
.as_ref()
.map(|heads| heads.iter())
.unwrap_or_default(),
)
}
}

impl From<DatabaseKeyIndex> for CycleHeads {
fn from(value: DatabaseKeyIndex) -> Self {
Self(Some(Box::new(vec![value])))
}
}

impl From<FxHashSet<DatabaseKeyIndex>> for CycleHeads {
fn from(value: FxHashSet<DatabaseKeyIndex>) -> Self {
impl From<Vec<DatabaseKeyIndex>> for CycleHeads {
fn from(value: Vec<DatabaseKeyIndex>) -> Self {
Self(if value.is_empty() {
None
} else {
Expand All @@ -175,4 +196,4 @@ impl From<FxHashSet<DatabaseKeyIndex>> for CycleHeads {
}
}

pub(crate) static EMPTY_CYCLE_HEADS: CycleHeads = CycleHeads(None);
pub(crate) const EMPTY_CYCLE_HEADS: CycleHeads = CycleHeads(None);
39 changes: 22 additions & 17 deletions src/function/maybe_changed_after.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
cycle::CycleRecoveryStrategy,
cycle::{CycleHeads, CycleRecoveryStrategy},
key::DatabaseKeyIndex,
table::sync::ClaimResult,
zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase},
zalsa_local::{ActiveQueryGuard, QueryEdge, QueryOrigin},
AsDynDatabase as _, Id, Revision,
};
use rustc_hash::FxHashSet;
use std::sync::atomic::Ordering;

use super::{memo::Memo, Configuration, IngredientImpl};
Expand All @@ -22,9 +21,9 @@ pub enum VerifyResult {
/// The first inner value tracks whether the memo or any of its dependencies have an
/// accumulated value.
///
/// Database keys in the hashset represent cycle heads encountered in validation; don't mark
/// The second is the cycle heads encountered in validation; don't mark
/// memos verified until we've iterated the full cycle to ensure no inputs changed.
Unchanged(InputAccumulatedValues, FxHashSet<DatabaseKeyIndex>),
Unchanged(InputAccumulatedValues, CycleHeads),
}

impl VerifyResult {
Expand All @@ -37,7 +36,7 @@ impl VerifyResult {
}

pub(crate) fn unchanged() -> Self {
Self::Unchanged(InputAccumulatedValues::Empty, FxHashSet::default())
Self::Unchanged(InputAccumulatedValues::Empty, CycleHeads::default())
}
}

Expand Down Expand Up @@ -69,7 +68,7 @@ where
} else {
VerifyResult::Unchanged(
memo.revisions.accumulated_inputs.load(),
FxHashSet::default(),
CycleHeads::default(),
)
};
}
Expand Down Expand Up @@ -112,7 +111,7 @@ where
CycleRecoveryStrategy::Fixpoint => {
return Some(VerifyResult::Unchanged(
InputAccumulatedValues::Empty,
FxHashSet::from_iter([database_key_index]),
CycleHeads::from(database_key_index),
));
}
},
Expand Down Expand Up @@ -158,7 +157,7 @@ where
Some(_) => InputAccumulatedValues::Any,
None => memo.revisions.accumulated_inputs.load(),
},
FxHashSet::default(),
CycleHeads::default(),
)
});
}
Expand Down Expand Up @@ -240,13 +239,12 @@ where
zalsa: &Zalsa,
memo: &Memo<C::Output<'_>>,
) -> bool {
for cycle_head in &memo.revisions.cycle_heads {
if zalsa
if (&memo.revisions.cycle_heads).into_iter().any(|cycle_head| {
zalsa
.lookup_ingredient(cycle_head.ingredient_index)
.is_provisional_cycle_head(db.as_dyn_database(), cycle_head.key_index)
{
return false;
}
}) {
return false;
}
// Relaxed is sufficient here because there are no other writes we need to ensure have
// happened before marking this memo as verified-final.
Expand Down Expand Up @@ -282,7 +280,7 @@ where
return VerifyResult::Changed;
}

let mut cycle_heads = FxHashSet::default();
let mut cycle_heads = vec![];
loop {
let inputs = match &old_memo.revisions.origin {
QueryOrigin::Assigned(_) => {
Expand Down Expand Up @@ -324,7 +322,7 @@ where
{
VerifyResult::Changed => return VerifyResult::Changed,
VerifyResult::Unchanged(input_accumulated, cycles) => {
cycle_heads.extend(cycles);
cycles.insert_into(&mut cycle_heads);
inputs |= input_accumulated;
}
}
Expand Down Expand Up @@ -384,7 +382,11 @@ where
// from cycle heads. We will handle our own memo (and the rest of our cycle) on a
// future iteration; first the outer cycle head needs to verify itself.

let in_heads = cycle_heads.remove(&database_key_index);
let in_heads = cycle_heads
.iter()
.position(|&head| head == database_key_index)
.inspect(|&head| _ = cycle_heads.swap_remove(head))
.is_some();

if cycle_heads.is_empty() {
old_memo.mark_as_verified(db, zalsa.current_revision(), database_key_index, inputs);
Expand All @@ -398,7 +400,10 @@ where
continue;
}
}
return VerifyResult::Unchanged(InputAccumulatedValues::Empty, cycle_heads);
return VerifyResult::Unchanged(
InputAccumulatedValues::Empty,
CycleHeads::from(cycle_heads),
);
}
}
}
49 changes: 26 additions & 23 deletions src/function/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,29 +177,32 @@ impl<V> Memo<V> {
database_key_index: DatabaseKeyIndex,
) -> bool {
let mut retry = false;
for head in self.cycle_heads() {
if head == database_key_index {
continue;
}
let ingredient = db.zalsa().lookup_ingredient(head.ingredient_index);
if !ingredient.is_provisional_cycle_head(db, head.key_index) {
// This cycle is already finalized, so we don't need to wait on it;
// keep looping through cycle heads.
retry = true;
continue;
}
if ingredient.wait_for(db, head.key_index) {
// There's a new memo available for the cycle head; fetch our own
// updated memo and see if it's still provisional or if the cycle
// has resolved.
retry = true;
continue;
} else {
// We hit a cycle blocking on the cycle head; this means it's in
// our own active query stack and we are responsible to resolve the
// cycle, so go ahead and return the provisional memo.
return false;
}
let hit_cycle = self
.cycle_heads()
.into_iter()
.filter(|&head| head != database_key_index)
.any(|head| {
let ingredient = db.zalsa().lookup_ingredient(head.ingredient_index);
if !ingredient.is_provisional_cycle_head(db, head.key_index) {
// This cycle is already finalized, so we don't need to wait on it;
// keep looping through cycle heads.
retry = true;
false
} else if ingredient.wait_for(db, head.key_index) {
// There's a new memo available for the cycle head; fetch our own
// updated memo and see if it's still provisional or if the cycle
// has resolved.
retry = true;
false
} else {
// We hit a cycle blocking on the cycle head; this means it's in
// our own active query stack and we are responsible to resolve the
// cycle, so go ahead and return the provisional memo.
true
}
});
if hit_cycle {
return false;
}
// If `retry` is `true`, all our cycle heads (barring ourself) are complete; re-fetch
// and we should get a non-provisional memo. If we get here and `retry` is still
Expand Down
7 changes: 2 additions & 5 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ pub mod singleton;
use input_field::FieldIngredientImpl;

use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
cycle::{CycleRecoveryStrategy, EMPTY_CYCLE_HEADS},
cycle::CycleRecoveryStrategy,
function::VerifyResult,
id::{AsId, FromIdWithDb},
ingredient::{fmt_index, Ingredient},
Expand Down Expand Up @@ -178,12 +177,10 @@ impl<C: Configuration> IngredientImpl<C> {
let id = id.as_id();
let value = Self::data(zalsa, id);
let stamp = &value.stamps[field_index];
zalsa_local.report_tracked_read(
zalsa_local.report_tracked_read_simple(
InputDependencyIndex::new(field_ingredient_index, id),
stamp.durability,
stamp.changed_at,
InputAccumulatedValues::Empty,
&EMPTY_CYCLE_HEADS,
);
&value.fields
}
Expand Down
6 changes: 1 addition & 5 deletions src/interned.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use dashmap::SharedValue;

use crate::accumulator::accumulated_map::InputAccumulatedValues;
use crate::cycle::EMPTY_CYCLE_HEADS;
use crate::durability::Durability;
use crate::function::VerifyResult;
use crate::ingredient::fmt_index;
Expand Down Expand Up @@ -180,12 +178,10 @@ where
C::Fields<'db>: HashEqLike<Key>,
{
let zalsa_local = db.zalsa_local();
zalsa_local.report_tracked_read(
zalsa_local.report_tracked_read_simple(
InputDependencyIndex::for_table(self.ingredient_index),
Durability::MAX,
self.reset_at,
InputAccumulatedValues::Empty,
&EMPTY_CYCLE_HEADS,
);

// Optimization to only get read lock on the map if the data has already been interned.
Expand Down
Loading