Skip to content

Try to integrate fork of Chili parallel runtime #140206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 72 additions & 113 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion compiler/rustc_data_structures/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ indexmap = "2.4.0"
jobserver_crate = { version = "0.1.28", package = "jobserver" }
measureme = "12.0.1"
rustc-hash = "2.0.0"
rustc-rayon-core = { version = "0.5.0" }
chili = { git = "https://github.com/zetanumbers/chili.git", branch = "rustc" }
rustc-stable-hash = { version = "0.1.0", features = ["nightly"] }
rustc_arena = { path = "../rustc_arena" }
rustc_graphviz = { path = "../rustc_graphviz" }
2 changes: 1 addition & 1 deletion compiler/rustc_data_structures/src/sync.rs
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
pub use self::lock::{Lock, LockGuard, Mode};
pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
pub use self::parallel::{
join, par_for_each_in, par_map, parallel_guard, scope, try_par_for_each_in,
join, join4, par_for_each_in, par_map, parallel_guard, try_par_for_each_in,
};
pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec};
pub use self::worker_local::{Registry, WorkerLocal};
113 changes: 65 additions & 48 deletions compiler/rustc_data_structures/src/sync/parallel.rs
Original file line number Diff line number Diff line change
@@ -58,49 +58,58 @@ where
(a.unwrap(), b.unwrap())
}

/// Runs a list of blocks in parallel. The first block is executed immediately on
/// the current thread. Use that for the longest running block.
#[macro_export]
macro_rules! parallel {
(impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
};
(impl $fblock:block [$($blocks:expr,)*] []) => {
$crate::sync::parallel_guard(|guard| {
$crate::sync::scope(|s| {
$(
let block = $crate::sync::FromDyn::from(|| $blocks);
s.spawn(move |_| {
guard.run(move || block.into_inner()());
});
)*
guard.run(|| $fblock);
});
});
};
($fblock:block, $($blocks:block),*) => {
if $crate::sync::is_dyn_thread_safe() {
// Reverse the order of the later blocks since Rayon executes them in reverse order
// when using a single thread. This ensures the execution order matches that
// of a single threaded rustc.
parallel!(impl $fblock [] [$($blocks),*]);
} else {
$crate::sync::parallel_guard(|guard| {
guard.run(|| $fblock);
$(guard.run(|| $blocks);)*
});
}
};
}

// This function only works when `mode::is_dyn_thread_safe()`.
pub fn scope<'scope, OP, R>(op: OP) -> R
pub fn join4<F0, F1, F2, F3, R0, R1, R2, R3>(
oper0: F0,
oper1: F1,
oper2: F2,
oper3: F3,
) -> (R0, R1, R2, R3)
where
OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend,
R: DynSend,
F0: FnOnce() -> R0 + DynSend,
F1: FnOnce() -> R1 + DynSend,
F2: FnOnce() -> R2 + DynSend,
F3: FnOnce() -> R3 + DynSend,
R0: DynSend,
R1: DynSend,
R2: DynSend,
R3: DynSend,
{
let op = FromDyn::from(op);
rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
if mode::is_dyn_thread_safe() {
let oper0 = FromDyn::from(oper0);
let oper1 = FromDyn::from(oper1);
let oper2 = FromDyn::from(oper2);
let oper3 = FromDyn::from(oper3);
// Swap closures around because Chili executes second one on the current thread
let (r1, (r2, (r3, r0))) = parallel_guard(|guard| {
chili::Scope::with_current(|scope| {
scope.unwrap().join_with_heartbeat_every::<1, _, _, _, _>(
move |_| guard.run(move || FromDyn::from(oper1.into_inner()())),
move |scope| {
scope.join_with_heartbeat_every::<1, _, _, _, _>(
move |_| guard.run(move || FromDyn::from(oper2.into_inner()())),
move |scope| {
scope.join_with_heartbeat_every::<1, _, _, _, _>(
move |_| guard.run(move || FromDyn::from(oper3.into_inner()())),
move |_| guard.run(move || FromDyn::from(oper0.into_inner()())),
)
},
)
},
)
})
});
(
r0.unwrap().into_inner(),
r1.unwrap().into_inner(),
r2.unwrap().into_inner(),
r3.unwrap().into_inner(),
)
} else {
let (r0, r1, r2, r3) = parallel_guard(|guard| {
(guard.run(oper0), guard.run(oper1), guard.run(oper2), guard.run(oper3))
});
(r0.unwrap(), r1.unwrap(), r2.unwrap(), r3.unwrap())
}
}

#[inline]
@@ -112,11 +121,14 @@ where
if mode::is_dyn_thread_safe() {
let oper_a = FromDyn::from(oper_a);
let oper_b = FromDyn::from(oper_b);
let (a, b) = parallel_guard(|guard| {
rayon_core::join(
move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
)
let (b, a) = parallel_guard(|guard| {
chili::Scope::with_current(|scope| {
scope.unwrap().join_with_heartbeat_every::<1, _, _, _, _>(
// Swap arguments around because Chili executes second one on the current thread
move |_| guard.run(move || FromDyn::from(oper_b.into_inner()())),
move |_| guard.run(move || FromDyn::from(oper_a.into_inner()())),
)
})
});
(a.unwrap().into_inner(), b.unwrap().into_inner())
} else {
@@ -136,6 +148,7 @@ fn par_slice<I: DynSend>(
}

fn par_rec<I: DynSend, F: Fn(&mut I) + DynSync + DynSend>(
scope: &mut chili::Scope<'_>,
items: &mut [I],
state: &State<'_, F>,
) {
@@ -147,7 +160,11 @@ fn par_slice<I: DynSend>(
let (left, right) = items.split_at_mut(items.len() / 2);
let mut left = state.for_each.derive(left);
let mut right = state.for_each.derive(right);
rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state));
scope.join(
// Swap arguments around because Chili executes second one on the current thread
move |scope| par_rec(scope, *right, state),
move |scope| par_rec(scope, *left, state),
);
}
}

@@ -156,7 +173,7 @@ fn par_slice<I: DynSend>(
guard,
group: std::cmp::max(items.len() / 128, 1),
};
par_rec(items, &state)
chili::Scope::with_current(|scope| par_rec(&mut scope.unwrap(), items, &state));
}

pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
2 changes: 1 addition & 1 deletion compiler/rustc_interface/Cargo.toml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ edition = "2024"

[dependencies]
# tidy-alphabetical-start
rustc-rayon-core = { version = "0.5.0" }
chili = { git = "https://github.com/zetanumbers/chili.git", branch = "rustc" }
rustc_ast = { path = "../rustc_ast" }
rustc_ast_lowering = { path = "../rustc_ast_lowering" }
rustc_ast_passes = { path = "../rustc_ast_passes" }
35 changes: 17 additions & 18 deletions compiler/rustc_interface/src/passes.rs
Original file line number Diff line number Diff line change
@@ -7,9 +7,8 @@ use std::{env, fs, iter};

use rustc_ast as ast;
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::parallel;
use rustc_data_structures::steal::Steal;
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal, join, join4};
use rustc_expand::base::{ExtCtxt, LintStoreExpand};
use rustc_feature::Features;
use rustc_fs_util::try_canonicalize;
@@ -902,8 +901,8 @@ fn run_required_analyses(tcx: TyCtxt<'_>) {
rustc_passes::hir_id_validator::check_crate(tcx);
let sess = tcx.sess;
sess.time("misc_checking_1", || {
parallel!(
{
join4(
|| {
sess.time("looking_for_entry_point", || tcx.ensure_ok().entry_fn(()));

sess.time("looking_for_derive_registrar", || {
@@ -912,27 +911,27 @@ fn run_required_analyses(tcx: TyCtxt<'_>) {

CStore::from_tcx(tcx).report_unused_deps(tcx);
},
{
|| {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_loops(module);
tcx.ensure_ok().check_mod_attrs(module);
tcx.ensure_ok().check_mod_naked_functions(module);
tcx.ensure_ok().check_mod_unstable_api_usage(module);
});
},
{
|| {
sess.time("unused_lib_feature_checking", || {
rustc_passes::stability::check_unused_or_stable_features(tcx)
});
},
{
|| {
// We force these queries to run,
// since they might not otherwise get called.
// This marks the corresponding crate-level attributes
// as used, and ensures that their values are valid.
tcx.ensure_ok().limits(());
tcx.ensure_ok().stability_index(());
}
},
);
});

@@ -1027,36 +1026,36 @@ fn analysis(tcx: TyCtxt<'_>, (): ()) {
}

sess.time("misc_checking_3", || {
parallel!(
{
join(
|| {
tcx.ensure_ok().effective_visibilities(());

parallel!(
{
join4(
|| {
tcx.ensure_ok().check_private_in_public(());
},
{
|| {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_deathness(module)
});
},
{
|| {
sess.time("lint_checking", || {
rustc_lint::check_crate(tcx);
});
},
{
|| {
tcx.ensure_ok().clashing_extern_declarations(());
}
},
);
},
{
|| {
sess.time("privacy_checking_modules", || {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_privacy(module);
});
});
}
},
);

// This check has to be run after all lints are done processing. We don't
60 changes: 33 additions & 27 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
use std::num::NonZero;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -190,17 +191,17 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
let current_gcx = FromDyn::from(CurrentGcx::new());
let current_gcx2 = current_gcx.clone();

let builder = rayon_core::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.acquire_thread_handler(jobserver::acquire_thread)
.release_thread_handler(jobserver::release_thread)
.num_threads(threads)
.deadlock_handler(move || {
let config = chili::Config {
// .thread_name(|_| "rustc".to_string())
acquire_thread_handler: Some(Box::new(jobserver::acquire_thread)),
release_thread_handler: Some(Box::new(jobserver::release_thread)),
thread_count: NonZero::new(threads),
stack_size: NonZero::new(thread_stack_size),
deadlock_handler: Some(Box::new(move || {
// On deadlock, creates a new thread and forwards information in thread
// locals to it. The new thread runs the deadlock handler.

let current_gcx2 = current_gcx2.clone();
let registry = rayon_core::Registry::current();
let session_globals = rustc_span::with_session_globals(|session_globals| {
session_globals as *const SessionGlobals as usize
});
@@ -226,16 +227,19 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
// We need the complete map to ensure we find a cycle to break.
QueryCtxt::new(tcx).collect_active_jobs().ok().expect("failed to collect active queries in deadlock handler")
});
break_query_cycles(query_map, &registry);
break_query_cycles(query_map);
})
})
});

on_panic.disable();
})
.unwrap();
})
.stack_size(thread_stack_size);
})),
// TODO: Tune heartbeat_interval
// heartbeat_interval: Duration::from_micros(100),
..Default::default()
};

// We create the session globals on the main thread, then create the thread
// pool. Upon creation, each worker thread created gets a copy of the
@@ -244,23 +248,25 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
rustc_span::create_session_globals_then(edition, extra_symbols, Some(sm_inputs), || {
rustc_span::with_session_globals(|session_globals| {
let session_globals = FromDyn::from(session_globals);
builder
.build_scoped(
// Initialize each new worker thread when created.
move |thread: rayon_core::ThreadBuilder| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

rustc_span::set_session_globals_then(session_globals.into_inner(), || {
thread.run()
})
},
// Run `f` on the first thread in the thread pool.
move |pool: &rayon_core::ThreadPool| {
pool.install(|| f(current_gcx.into_inner()))
},
)
.unwrap()
chili::ThreadPool::scoped_global(
config,
// Initialize each new worker thread when created.
move |thread: chili::ThreadBuilder| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

rustc_span::set_session_globals_then(session_globals.into_inner(), || {
thread.run()
})
},
// Run `f` on the first thread in the thread pool.
move || {
chili::Scope::with_current(|scope| {
scope.unwrap().install(|_| f(current_gcx.into_inner()))
})
},
)
.unwrap()
})
})
}
2 changes: 1 addition & 1 deletion compiler/rustc_middle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ bitflags = "2.4.1"
either = "1.5.0"
gsgdt = "0.1.2"
polonius-engine = "0.13.0"
rustc-rayon-core = { version = "0.5.0" }
chili = { git = "https://github.com/zetanumbers/chili.git", branch = "rustc" }
rustc_abi = { path = "../rustc_abi" }
rustc_apfloat = "0.2.0"
rustc_arena = { path = "../rustc_arena" }
2 changes: 1 addition & 1 deletion compiler/rustc_middle/src/ty/context/tls.rs
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ impl<'a, 'tcx> ImplicitCtxt<'a, 'tcx> {
}

// Import the thread-local variable from Rayon, which is preserved for Rayon jobs.
use rayon_core::tlv::TLV;
use chili::tlv::TLV;

#[inline]
fn erase(context: &ImplicitCtxt<'_, '_>) -> *const () {
2 changes: 1 addition & 1 deletion compiler/rustc_query_system/Cargo.toml
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
# tidy-alphabetical-start
parking_lot = "0.12"
rustc-rayon-core = { version = "0.5.0" }
chili = { git = "https://github.com/zetanumbers/chili.git", branch = "rustc" }
rustc_abi = { path = "../rustc_abi" }
rustc_ast = { path = "../rustc_ast" }
rustc_attr_data_structures = { path = "../rustc_attr_data_structures" }
12 changes: 4 additions & 8 deletions compiler/rustc_query_system/src/query/job.rs
Original file line number Diff line number Diff line change
@@ -236,7 +236,7 @@ impl<I> QueryLatch<I> {
// If this detects a deadlock and the deadlock handler wants to resume this thread
// we have to be in the `wait` call. This is ensured by the deadlock handler
// getting the self.info lock.
rayon_core::mark_blocked();
chili::mark_blocked().unwrap();
jobserver::release_thread();
waiter.condvar.wait(&mut info);
// Release the lock before we potentially block in `acquire_thread`
@@ -250,9 +250,8 @@ impl<I> QueryLatch<I> {
let mut info = self.info.lock();
debug_assert!(!info.complete);
info.complete = true;
let registry = rayon_core::Registry::current();
for waiter in info.waiters.drain(..) {
rayon_core::mark_unblocked(&registry);
chili::mark_unblocked().unwrap();
waiter.condvar.notify_one();
}
}
@@ -504,10 +503,7 @@ fn remove_cycle<I: Clone>(
/// uses a query latch and then resuming that waiter.
/// There may be multiple cycles involved in a deadlock, so this searches
/// all active queries for cycles before finally resuming all the waiters at once.
pub fn break_query_cycles<I: Clone + Debug>(
query_map: QueryMap<I>,
registry: &rayon_core::Registry,
) {
pub fn break_query_cycles<I: Clone + Debug>(query_map: QueryMap<I>) {
let mut wakelist = Vec::new();
let mut jobs: Vec<QueryJobId> = query_map.keys().cloned().collect();

@@ -538,7 +534,7 @@ pub fn break_query_cycles<I: Clone + Debug>(
// we wake the threads up as otherwise Rayon could detect a deadlock if a thread we
// resumed fell asleep and this thread had yet to mark the remaining threads as unblocked.
for _ in 0..wakelist.len() {
rayon_core::mark_unblocked(registry);
chili::mark_unblocked().unwrap();
}

for waiter in wakelist.into_iter() {