Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 47 additions & 29 deletions src/runtime/dependency_graph.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::pin::Pin;
use std::thread::ThreadId;

use parking_lot::MutexGuard;
use rustc_hash::FxHashMap;
use smallvec::SmallVec;

use crate::key::DatabaseKeyIndex;
use crate::runtime::dependency_graph::edge::EdgeCondvar;
use crate::runtime::WaitResult;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -61,7 +63,12 @@ impl DependencyGraph {
to_id: ThreadId,
query_mutex_guard: QueryMutexGuard,
) -> WaitResult {
let edge = me.add_edge(from_id, database_key, to_id);
let cvar = std::pin::pin!(EdgeCondvar::default());
let cvar = cvar.as_ref();
// SAFETY: We are blocking until the result is removed from `DependencyGraph::wait_results`
// at which point the `edge` won't signal the condvar anymore.
// As such we are keeping the cond var alive until the reference in the edge drops.
unsafe { me.add_edge(from_id, database_key, to_id, cvar) };

// Release the mutex that prevents `database_key`
// from completing, now that the edge has been added.
Expand All @@ -72,29 +79,35 @@ impl DependencyGraph {
debug_assert!(!me.edges.contains_key(&from_id));
return result;
}
edge.wait(&mut me);
cvar.wait(&mut me);
}
}

/// Helper for `block_on`: performs actual graph modification
/// to add a dependency edge from `from_id` to `to_id`, which is
/// computing `database_key`.
fn add_edge(
///
/// # Safety
///
/// The caller needs to keep the referent of `cvar` alive until the corresponding
/// [`Self::wait_results`] entry has been inserted.
unsafe fn add_edge(
&mut self,
from_id: ThreadId,
database_key: DatabaseKeyIndex,
to_id: ThreadId,
) -> edge::EdgeGuard {
cvar: Pin<&EdgeCondvar>,
) {
assert_ne!(from_id, to_id);
debug_assert!(!self.edges.contains_key(&from_id));
debug_assert!(!self.depends_on(to_id, from_id));
let (edge, guard) = edge::Edge::new(to_id);
// SAFETY: The caller is responsible for ensuring that the `EdgeGuard` outlives the `Edge`.
let edge = unsafe { edge::Edge::new(to_id, cvar) };
self.edges.insert(from_id, edge);
self.query_dependents
.entry(database_key)
.or_default()
.push(from_id);
guard
}

/// Invoked when runtime `to_id` completes executing
Expand Down Expand Up @@ -128,45 +141,50 @@ impl DependencyGraph {
}

mod edge {
use std::sync::Arc;
use std::thread::ThreadId;
use std::{pin::Pin, thread::ThreadId};

use parking_lot::Condvar;

use parking_lot::MutexGuard;
#[derive(Default, Debug)]
pub(super) struct EdgeCondvar {
condvar: Condvar,
_phantom_pin: std::marker::PhantomPinned,
}

use crate::runtime::dependency_graph::DependencyGraph;
impl EdgeCondvar {
#[inline]
pub(super) fn wait<T: ?Sized>(&self, mutex_guard: &mut parking_lot::MutexGuard<'_, T>) {
self.condvar.wait(mutex_guard)
}
}

#[derive(Debug)]
pub(super) struct Edge {
pub(super) blocked_on_id: ThreadId,

/// Signalled whenever a query with dependents completes.
/// Allows those dependents to check if they are ready to unblock.
condvar: Arc<parking_lot::Condvar>,
}

pub struct EdgeGuard {
condvar: Arc<parking_lot::Condvar>,
}

impl EdgeGuard {
pub fn wait(&self, mutex_guard: &mut MutexGuard<'_, DependencyGraph>) {
self.condvar.wait(mutex_guard)
}
// condvar: unsafe<'stack_frame> Pin<&'stack_frame parking_lot::Condvar>,
condvar: Pin<&'static EdgeCondvar>,
}

impl Edge {
pub(super) fn new(blocked_on_id: ThreadId) -> (Self, EdgeGuard) {
let condvar = Arc::new(parking_lot::Condvar::new());
let edge = Self {
/// # SAFETY
///
/// The caller must ensure that the [`EdgeCondvar`] is kept alive until the [`Edge`] is dropped.
pub(super) unsafe fn new(blocked_on_id: ThreadId, condvar: Pin<&EdgeCondvar>) -> Self {
Self {
blocked_on_id,
condvar: condvar.clone(),
};
let edge_guard = EdgeGuard { condvar };
(edge, edge_guard)
// SAFETY: The caller is responsible for ensuring that the `EdgeCondvar` outlives the `Edge`.
condvar: unsafe {
std::mem::transmute::<Pin<&EdgeCondvar>, Pin<&'static EdgeCondvar>>(condvar)
},
}
}

#[inline]
pub(super) fn notify(self) {
self.condvar.notify_one();
self.condvar.condvar.notify_one();
}
}
}