1use std::panic::AssertUnwindSafe;
6
7use hir::{Symbol, db::DefDatabase};
8use rustc_hash::FxHashMap;
9use salsa::{Cancelled, Database};
10
11use crate::{
12 FxIndexMap, RootDatabase,
13 base_db::{Crate, RootQueryDb},
14 symbol_index::SymbolsDatabase,
15};
16
17#[derive(Debug)]
19pub struct ParallelPrimeCachesProgress {
20 pub crates_currently_indexing: Vec<Symbol>,
22 pub crates_total: usize,
24 pub crates_done: usize,
26 pub work_type: &'static str,
27}
28
29pub fn parallel_prime_caches(
30 db: &RootDatabase,
31 num_worker_threads: usize,
32 cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync),
33) {
34 let _p = tracing::info_span!("parallel_prime_caches").entered();
35
36 enum ParallelPrimeCacheWorkerProgress {
37 BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
38 EndCrateDefMap { crate_id: Crate },
39 EndCrateImportMap,
40 EndModuleSymbols,
41 Cancelled(Cancelled),
42 }
43
44 let (reverse_deps, mut to_be_done_deps) = {
58 let all_crates = db.all_crates();
59 let to_be_done_deps = all_crates
60 .iter()
61 .map(|&krate| (krate, krate.data(db).dependencies.len() as u32))
62 .collect::<FxHashMap<_, _>>();
63 let mut reverse_deps =
64 all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
65 for &krate in &*all_crates {
66 for dep in &krate.data(db).dependencies {
67 reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
68 }
69 }
70 (reverse_deps, to_be_done_deps)
71 };
72
73 let (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver) = {
74 let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
75 let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
76 let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
77 let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
78 let prime_caches_worker =
79 move |db: RootDatabase| {
80 let handle_def_map = |crate_id, crate_name| {
81 progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
82 crate_id,
83 crate_name,
84 })?;
85
86 let cancelled = Cancelled::catch(|| _ = hir::crate_def_map(&db, crate_id));
87
88 match cancelled {
89 Ok(()) => progress_sender
90 .send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
91 Err(cancelled) => progress_sender
92 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
93 }
94
95 Ok::<_, crossbeam_channel::SendError<_>>(())
96 };
97 let handle_import_map = |crate_id| {
98 let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id));
99
100 match cancelled {
101 Ok(()) => progress_sender
102 .send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?,
103 Err(cancelled) => progress_sender
104 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
105 }
106
107 Ok::<_, crossbeam_channel::SendError<_>>(())
108 };
109 let handle_symbols = |module| {
110 let cancelled =
111 Cancelled::catch(AssertUnwindSafe(|| _ = db.module_symbols(module)));
112
113 match cancelled {
114 Ok(()) => progress_sender
115 .send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?,
116 Err(cancelled) => progress_sender
117 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
118 }
119
120 Ok::<_, crossbeam_channel::SendError<_>>(())
121 };
122
123 loop {
124 db.unwind_if_revision_cancelled();
125
126 crossbeam_channel::select_biased! {
128 recv(def_map_work_receiver) -> work => {
129 let Ok((crate_id, crate_name)) = work else { break };
130 handle_def_map(crate_id, crate_name)?;
131 }
132 recv(import_map_work_receiver) -> work => {
133 let Ok(crate_id) = work else { break };
134 handle_import_map(crate_id)?;
135 }
136 recv(symbols_work_receiver) -> work => {
137 let Ok(module) = work else { break };
138 handle_symbols(module)?;
139 }
140 }
141 }
142 Ok::<_, crossbeam_channel::SendError<_>>(())
143 };
144
145 for id in 0..num_worker_threads {
146 stdx::thread::Builder::new(
147 stdx::thread::ThreadIntent::Worker,
148 format!("PrimeCaches#{id}"),
149 )
150 .allow_leak(true)
151 .spawn({
152 let worker = prime_caches_worker.clone();
153 let db = db.clone();
154 move || worker(db)
155 })
156 .expect("failed to spawn thread");
157 }
158
159 (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver)
160 };
161
162 let crate_def_maps_total = db.all_crates().len();
163 let mut crate_def_maps_done = 0;
164 let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
165 let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
166
167 let mut crates_currently_indexing =
170 FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
171
172 for (&krate, &to_be_done_deps) in &to_be_done_deps {
173 if to_be_done_deps != 0 {
174 continue;
175 }
176
177 let name = crate_name(db, krate);
178 def_map_work_sender.send((krate, name)).ok();
179 }
180
181 while crate_def_maps_done < crate_def_maps_total
182 || crate_import_maps_done < crate_import_maps_total
183 || module_symbols_done < module_symbols_total
184 {
185 db.unwind_if_revision_cancelled();
186
187 let progress = ParallelPrimeCachesProgress {
188 crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
189 crates_done: crate_def_maps_done,
190 crates_total: crate_def_maps_total,
191 work_type: "Indexing",
192 };
193
194 cb(progress);
195
196 let progress = match progress_receiver.recv() {
198 Ok(p) => p,
199 Err(crossbeam_channel::RecvError) => {
200 cb(ParallelPrimeCachesProgress {
202 crates_currently_indexing: vec![],
203 crates_done: crate_def_maps_done,
204 crates_total: crate_def_maps_done,
205 work_type: "Done",
206 });
207 return;
208 }
209 };
210
211 match progress {
212 ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
213 crates_currently_indexing.insert(crate_id, crate_name);
214 }
215 ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
216 crates_currently_indexing.swap_remove(&crate_id);
217 crate_def_maps_done += 1;
218
219 for &dep in &reverse_deps[&crate_id] {
221 let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
222 *to_be_done -= 1;
223 if *to_be_done == 0 {
224 let dep_name = crate_name(db, dep);
225 def_map_work_sender.send((dep, dep_name)).ok();
226 }
227 }
228
229 if crate_def_maps_done == crate_def_maps_total {
230 cb(ParallelPrimeCachesProgress {
231 crates_currently_indexing: vec![],
232 crates_done: crate_def_maps_done,
233 crates_total: crate_def_maps_done,
234 work_type: "Collecting Symbols",
235 });
236 }
237
238 let origin = &crate_id.data(db).origin;
239 if origin.is_lang() {
240 crate_import_maps_total += 1;
241 import_map_work_sender.send(crate_id).ok();
242 } else if origin.is_local() {
243 let modules = hir::Crate::from(crate_id).modules(db);
253 module_symbols_total += modules.len();
254 for module in modules {
255 symbols_work_sender.send(module).ok();
256 }
257 }
258 }
259 ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
260 ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
261 ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
262 std::panic::resume_unwind(Box::new(cancelled));
264 }
265 }
266 }
267}
268
269fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
270 krate
271 .extra_data(db)
272 .display_name
273 .as_deref()
274 .cloned()
275 .unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).index() as usize))
276}