mod topologic_sort;
use std::time::Duration;
use hir::{db::DefDatabase, Symbol};
use itertools::Itertools;
use crate::{
base_db::{
ra_salsa::{Database, ParallelDatabase, Snapshot},
Cancelled, CrateId, SourceDatabase,
},
symbol_index::SymbolsDatabase,
FxIndexMap, RootDatabase,
};
#[derive(Debug)]
pub struct ParallelPrimeCachesProgress {
pub crates_currently_indexing: Vec<Symbol>,
pub crates_total: usize,
pub crates_done: usize,
pub work_type: &'static str,
}
pub fn parallel_prime_caches(
db: &RootDatabase,
num_worker_threads: usize,
cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync),
) {
let _p = tracing::info_span!("parallel_prime_caches").entered();
let graph = db.crate_graph();
let mut crates_to_prime = {
let mut builder = topologic_sort::TopologicalSortIter::builder();
for crate_id in graph.iter() {
builder.add(crate_id, graph[crate_id].dependencies.iter().map(|d| d.crate_id));
}
builder.build()
};
enum ParallelPrimeCacheWorkerProgress {
BeginCrate { crate_id: CrateId, crate_name: Symbol },
EndCrate { crate_id: CrateId },
}
#[derive(PartialOrd, Ord, PartialEq, Eq, Copy, Clone)]
enum PrimingPhase {
DefMap,
ImportMap,
CrateSymbols,
}
let (work_sender, progress_receiver) = {
let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
let (work_sender, work_receiver) = crossbeam_channel::unbounded();
let prime_caches_worker = move |db: Snapshot<RootDatabase>| {
while let Ok((crate_id, crate_name, kind)) = work_receiver.recv() {
progress_sender
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
match kind {
PrimingPhase::DefMap => _ = db.crate_def_map(crate_id),
PrimingPhase::ImportMap => _ = db.import_map(crate_id),
PrimingPhase::CrateSymbols => _ = db.crate_symbols(crate_id.into()),
}
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?;
}
Ok::<_, crossbeam_channel::SendError<_>>(())
};
for id in 0..num_worker_threads {
let worker = prime_caches_worker.clone();
let db = db.snapshot();
stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
.allow_leak(true)
.name(format!("PrimeCaches#{id}"))
.spawn(move || Cancelled::catch(|| worker(db)))
.expect("failed to spawn thread");
}
(work_sender, progress_receiver)
};
let crates_total = crates_to_prime.pending();
let mut crates_done = 0;
let mut crates_currently_indexing =
FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
let mut additional_phases = vec![];
while crates_done < crates_total {
db.unwind_if_cancelled();
for crate_id in &mut crates_to_prime {
let krate = &graph[crate_id];
let name = krate
.display_name
.as_deref()
.cloned()
.unwrap_or_else(|| Symbol::integer(crate_id.into_raw().into_u32() as usize));
if krate.origin.is_lang() {
additional_phases.push((crate_id, name.clone(), PrimingPhase::ImportMap));
} else if krate.origin.is_local() {
additional_phases.push((crate_id, name.clone(), PrimingPhase::CrateSymbols));
}
work_sender.send((crate_id, name, PrimingPhase::DefMap)).ok();
}
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
Ok(p) => p,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
continue;
}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
db.unwind_if_cancelled();
break;
}
};
match worker_progress {
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
crates_currently_indexing.insert(crate_id, crate_name);
}
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
crates_currently_indexing.swap_remove(&crate_id);
crates_to_prime.mark_done(crate_id);
crates_done += 1;
}
};
let progress = ParallelPrimeCachesProgress {
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
crates_done,
crates_total,
work_type: "Indexing",
};
cb(progress);
}
let mut crates_done = 0;
let crates_total = additional_phases.len();
for w in additional_phases.into_iter().sorted_by_key(|&(_, _, phase)| phase) {
work_sender.send(w).ok();
}
while crates_done < crates_total {
db.unwind_if_cancelled();
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
Ok(p) => p,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
continue;
}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
db.unwind_if_cancelled();
break;
}
};
match worker_progress {
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
crates_currently_indexing.insert(crate_id, crate_name);
}
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
crates_currently_indexing.swap_remove(&crate_id);
crates_done += 1;
}
};
let progress = ParallelPrimeCachesProgress {
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
crates_done,
crates_total,
work_type: "Populating symbols",
};
cb(progress);
}
}