1use std::panic::AssertUnwindSafe;
6
7use hir::{Symbol, db::DefDatabase};
8use rustc_hash::FxHashMap;
9use salsa::{Cancelled, Database};
10
11use crate::{
12 FxIndexMap, RootDatabase,
13 base_db::{Crate, RootQueryDb},
14 symbol_index::SymbolIndex,
15};
16
17#[derive(Debug)]
19pub struct ParallelPrimeCachesProgress {
20 pub crates_currently_indexing: Vec<Symbol>,
22 pub crates_total: usize,
24 pub crates_done: usize,
26 pub work_type: &'static str,
27}
28
29pub fn parallel_prime_caches(
30 db: &RootDatabase,
31 num_worker_threads: usize,
32 cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync),
33) {
34 let _p = tracing::info_span!("parallel_prime_caches").entered();
35
36 enum ParallelPrimeCacheWorkerProgress {
37 BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
38 EndCrateDefMap { crate_id: Crate },
39 EndCrateImportMap,
40 EndSema,
41 EndModuleSymbols,
42 Cancelled(Cancelled),
43 }
44
45 let (reverse_deps, mut to_be_done_deps) = {
59 let all_crates = db.all_crates();
60 let to_be_done_deps = all_crates
61 .iter()
62 .map(|&krate| (krate, krate.data(db).dependencies.len() as u32))
63 .collect::<FxHashMap<_, _>>();
64 let mut reverse_deps =
65 all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
66 for &krate in &*all_crates {
67 for dep in &krate.data(db).dependencies {
68 reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
69 }
70 }
71 (reverse_deps, to_be_done_deps)
72 };
73
74 let (
75 def_map_work_sender,
76 import_map_work_sender,
77 symbols_work_sender,
78 sema_work_sender,
79 progress_receiver,
80 ) = {
81 let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
82 let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
83 let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
84 let (sema_work_sender, sema_work_receiver) = crossbeam_channel::unbounded();
85 let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
86 let prime_caches_worker = move |db: RootDatabase| {
87 let handle_def_map = |crate_id, crate_name| {
88 progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
89 crate_id,
90 crate_name,
91 })?;
92
93 let cancelled = Cancelled::catch(|| {
94 _ = hir::crate_def_map(&db, crate_id);
95 });
96
97 match cancelled {
98 Ok(()) => progress_sender
99 .send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
100 Err(cancelled) => progress_sender
101 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
102 }
103
104 Ok::<_, crossbeam_channel::SendError<_>>(())
105 };
106 let handle_sema = |crate_id| {
107 let cancelled = Cancelled::catch(|| {
108 hir::attach_db(&db, || {
109 _ = hir::TraitImpls::for_crate(&db, crate_id);
114 })
115 });
116
117 match cancelled {
118 Ok(()) => progress_sender.send(ParallelPrimeCacheWorkerProgress::EndSema)?,
119 Err(cancelled) => progress_sender
120 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
121 }
122
123 Ok::<_, crossbeam_channel::SendError<_>>(())
124 };
125 let handle_import_map = |crate_id| {
126 let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id));
127
128 match cancelled {
129 Ok(()) => {
130 progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?
131 }
132 Err(cancelled) => progress_sender
133 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
134 }
135
136 Ok::<_, crossbeam_channel::SendError<_>>(())
137 };
138 let handle_symbols = |module: hir::Module| {
139 let cancelled = Cancelled::catch(AssertUnwindSafe(|| {
140 _ = SymbolIndex::module_symbols(&db, module)
141 }));
142
143 match cancelled {
144 Ok(()) => {
145 progress_sender.send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?
146 }
147 Err(cancelled) => progress_sender
148 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
149 }
150
151 Ok::<_, crossbeam_channel::SendError<_>>(())
152 };
153
154 loop {
155 db.unwind_if_revision_cancelled();
156
157 crossbeam_channel::select_biased! {
159 recv(def_map_work_receiver) -> work => {
160 let Ok((crate_id, crate_name)) = work else { break };
161 handle_def_map(crate_id, crate_name)?;
162 }
163 recv(sema_work_receiver) -> work => {
164 let Ok(crate_id) = work else { break };
165 handle_sema(crate_id)?;
166 }
167 recv(import_map_work_receiver) -> work => {
168 let Ok(crate_id) = work else { break };
169 handle_import_map(crate_id)?;
170 }
171 recv(symbols_work_receiver) -> work => {
172 let Ok(module) = work else { break };
173 handle_symbols(module)?;
174 }
175 }
176 }
177 Ok::<_, crossbeam_channel::SendError<_>>(())
178 };
179
180 for id in 0..num_worker_threads {
181 stdx::thread::Builder::new(
182 stdx::thread::ThreadIntent::Worker,
183 format!("PrimeCaches#{id}"),
184 )
185 .allow_leak(true)
186 .spawn({
187 let worker = prime_caches_worker.clone();
188 let db = db.clone();
189 move || worker(db)
190 })
191 .expect("failed to spawn thread");
192 }
193
194 (
195 def_map_work_sender,
196 import_map_work_sender,
197 symbols_work_sender,
198 sema_work_sender,
199 progress_receiver,
200 )
201 };
202
203 let crate_def_maps_total = db.all_crates().len();
204 let mut crate_def_maps_done = 0;
205 let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
206 let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
207 let (mut sema_total, mut sema_done) = (0usize, 0usize);
208
209 let mut crates_currently_indexing =
212 FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
213
214 for (&krate, &to_be_done_deps) in &to_be_done_deps {
215 if to_be_done_deps != 0 {
216 continue;
217 }
218
219 let name = crate_name(db, krate);
220 def_map_work_sender.send((krate, name)).ok();
221 }
222
223 while crate_def_maps_done < crate_def_maps_total
224 || crate_import_maps_done < crate_import_maps_total
225 || module_symbols_done < module_symbols_total
226 || sema_done < sema_total
227 {
228 db.unwind_if_revision_cancelled();
229
230 let progress = ParallelPrimeCachesProgress {
231 crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
232 crates_done: crate_def_maps_done,
233 crates_total: crate_def_maps_total,
234 work_type: "Indexing",
235 };
236
237 cb(progress);
238
239 let progress = match progress_receiver.recv() {
241 Ok(p) => p,
242 Err(crossbeam_channel::RecvError) => {
243 cb(ParallelPrimeCachesProgress {
245 crates_currently_indexing: vec![],
246 crates_done: crate_def_maps_done,
247 crates_total: crate_def_maps_done,
248 work_type: "Done",
249 });
250 return;
251 }
252 };
253
254 match progress {
255 ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
256 crates_currently_indexing.insert(crate_id, crate_name);
257 }
258 ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
259 crates_currently_indexing.swap_remove(&crate_id);
260 crate_def_maps_done += 1;
261
262 for &dep in &reverse_deps[&crate_id] {
264 let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
265 *to_be_done -= 1;
266 if *to_be_done == 0 {
267 let dep_name = crate_name(db, dep);
268 def_map_work_sender.send((dep, dep_name)).ok();
269 }
270 }
271
272 if crate_def_maps_done == crate_def_maps_total {
273 cb(ParallelPrimeCachesProgress {
274 crates_currently_indexing: vec![],
275 crates_done: crate_def_maps_done,
276 crates_total: crate_def_maps_done,
277 work_type: "Collecting Symbols",
278 });
279 }
280
281 sema_work_sender.send(crate_id).ok();
282 sema_total += 1;
283 let origin = &crate_id.data(db).origin;
284 if origin.is_lang() {
285 crate_import_maps_total += 1;
286 import_map_work_sender.send(crate_id).ok();
287 } else if origin.is_local() {
288 let modules = hir::Crate::from(crate_id).modules(db);
298 module_symbols_total += modules.len();
299 for module in modules {
300 symbols_work_sender.send(module).ok();
301 }
302 }
303 }
304 ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
305 ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
306 ParallelPrimeCacheWorkerProgress::EndSema => sema_done += 1,
307 ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
308 std::panic::resume_unwind(Box::new(cancelled));
310 }
311 }
312 }
313}
314
315fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
316 krate
317 .extra_data(db)
318 .display_name
319 .as_deref()
320 .cloned()
321 .unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).index() as usize))
322}