1use std::panic::AssertUnwindSafe;
6
7use hir::{Symbol, db::DefDatabase};
8use rustc_hash::FxHashMap;
9use salsa::{Cancelled, Database};
10
11use crate::{
12 FxIndexMap, RootDatabase,
13 base_db::{Crate, RootQueryDb},
14 symbol_index::SymbolIndex,
15};
16
17#[derive(Debug)]
19pub struct ParallelPrimeCachesProgress {
20 pub crates_currently_indexing: Vec<Symbol>,
22 pub crates_total: usize,
24 pub crates_done: usize,
26 pub work_type: &'static str,
27}
28
29pub fn parallel_prime_caches(
30 db: &RootDatabase,
31 num_worker_threads: usize,
32 cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync),
33) {
34 let _p = tracing::info_span!("parallel_prime_caches").entered();
35
36 enum ParallelPrimeCacheWorkerProgress {
37 BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
38 EndCrateDefMap { crate_id: Crate },
39 EndCrateImportMap,
40 EndSema,
41 EndModuleSymbols,
42 Cancelled(Cancelled),
43 }
44
45 let (reverse_deps, mut to_be_done_deps) = {
59 let all_crates = db.all_crates();
60 let to_be_done_deps = all_crates
61 .iter()
62 .map(|&krate| (krate, krate.data(db).dependencies.len() as u32))
63 .collect::<FxHashMap<_, _>>();
64 let mut reverse_deps =
65 all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
66 for &krate in &*all_crates {
67 for dep in &krate.data(db).dependencies {
68 reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
69 }
70 }
71 (reverse_deps, to_be_done_deps)
72 };
73
74 let (
75 def_map_work_sender,
76 import_map_work_sender,
77 symbols_work_sender,
78 sema_work_sender,
79 progress_receiver,
80 ) = {
81 let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
82 let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
83 let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
84 let (sema_work_sender, sema_work_receiver) = crossbeam_channel::unbounded();
85 let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
86 let prime_caches_worker = move |db: RootDatabase| {
87 let handle_def_map = |crate_id, crate_name| {
88 progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
89 crate_id,
90 crate_name,
91 })?;
92
93 let cancelled = Cancelled::catch(|| {
94 _ = hir::crate_def_map(&db, crate_id);
95 });
96
97 match cancelled {
98 Ok(()) => progress_sender
99 .send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
100 Err(cancelled) => progress_sender
101 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
102 }
103
104 Ok::<_, crossbeam_channel::SendError<_>>(())
105 };
106 let handle_sema = |crate_id| {
107 let cancelled = Cancelled::catch(|| {
108 hir::attach_db(&db, || {
109 _ = hir::TraitImpls::for_crate(&db, crate_id);
112 _ = hir::crate_lang_items(&db, crate_id);
115 })
116 });
117
118 match cancelled {
119 Ok(()) => progress_sender.send(ParallelPrimeCacheWorkerProgress::EndSema)?,
120 Err(cancelled) => progress_sender
121 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
122 }
123
124 Ok::<_, crossbeam_channel::SendError<_>>(())
125 };
126 let handle_import_map = |crate_id| {
127 let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id));
128
129 match cancelled {
130 Ok(()) => {
131 progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?
132 }
133 Err(cancelled) => progress_sender
134 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
135 }
136
137 Ok::<_, crossbeam_channel::SendError<_>>(())
138 };
139 let handle_symbols = |module: hir::Module| {
140 let cancelled = Cancelled::catch(AssertUnwindSafe(|| {
141 _ = SymbolIndex::module_symbols(&db, module)
142 }));
143
144 match cancelled {
145 Ok(()) => {
146 progress_sender.send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?
147 }
148 Err(cancelled) => progress_sender
149 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
150 }
151
152 Ok::<_, crossbeam_channel::SendError<_>>(())
153 };
154
155 loop {
156 db.unwind_if_revision_cancelled();
157
158 crossbeam_channel::select_biased! {
160 recv(def_map_work_receiver) -> work => {
161 let Ok((crate_id, crate_name)) = work else { break };
162 handle_def_map(crate_id, crate_name)?;
163 }
164 recv(sema_work_receiver) -> work => {
165 let Ok(crate_id) = work else { break };
166 handle_sema(crate_id)?;
167 }
168 recv(import_map_work_receiver) -> work => {
169 let Ok(crate_id) = work else { break };
170 handle_import_map(crate_id)?;
171 }
172 recv(symbols_work_receiver) -> work => {
173 let Ok(module) = work else { break };
174 handle_symbols(module)?;
175 }
176 }
177 }
178 Ok::<_, crossbeam_channel::SendError<_>>(())
179 };
180
181 for id in 0..num_worker_threads {
182 stdx::thread::Builder::new(
183 stdx::thread::ThreadIntent::Worker,
184 format!("PrimeCaches#{id}"),
185 )
186 .allow_leak(true)
187 .spawn({
188 let worker = prime_caches_worker.clone();
189 let db = db.clone();
190 move || worker(db)
191 })
192 .expect("failed to spawn thread");
193 }
194
195 (
196 def_map_work_sender,
197 import_map_work_sender,
198 symbols_work_sender,
199 sema_work_sender,
200 progress_receiver,
201 )
202 };
203
204 let crate_def_maps_total = db.all_crates().len();
205 let mut crate_def_maps_done = 0;
206 let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
207 let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
208 let (mut sema_total, mut sema_done) = (0usize, 0usize);
209
210 let mut crates_currently_indexing =
213 FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
214
215 for (&krate, &to_be_done_deps) in &to_be_done_deps {
216 if to_be_done_deps != 0 {
217 continue;
218 }
219
220 let name = crate_name(db, krate);
221 def_map_work_sender.send((krate, name)).ok();
222 }
223
224 while crate_def_maps_done < crate_def_maps_total
225 || crate_import_maps_done < crate_import_maps_total
226 || module_symbols_done < module_symbols_total
227 || sema_done < sema_total
228 {
229 db.unwind_if_revision_cancelled();
230
231 let progress = ParallelPrimeCachesProgress {
232 crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
233 crates_done: crate_def_maps_done,
234 crates_total: crate_def_maps_total,
235 work_type: "Indexing",
236 };
237
238 cb(progress);
239
240 let progress = match progress_receiver.recv() {
242 Ok(p) => p,
243 Err(crossbeam_channel::RecvError) => {
244 cb(ParallelPrimeCachesProgress {
246 crates_currently_indexing: vec![],
247 crates_done: crate_def_maps_done,
248 crates_total: crate_def_maps_done,
249 work_type: "Done",
250 });
251 return;
252 }
253 };
254
255 match progress {
256 ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
257 crates_currently_indexing.insert(crate_id, crate_name);
258 }
259 ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
260 crates_currently_indexing.swap_remove(&crate_id);
261 crate_def_maps_done += 1;
262
263 for &dep in &reverse_deps[&crate_id] {
265 let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
266 *to_be_done -= 1;
267 if *to_be_done == 0 {
268 let dep_name = crate_name(db, dep);
269 def_map_work_sender.send((dep, dep_name)).ok();
270 }
271 }
272
273 if crate_def_maps_done == crate_def_maps_total {
274 cb(ParallelPrimeCachesProgress {
276 crates_currently_indexing: vec![],
277 crates_done: crate_def_maps_done,
278 crates_total: crate_def_maps_done,
279 work_type: "Collecting Symbols",
280 });
281 }
282
283 sema_work_sender.send(crate_id).ok();
284 sema_total += 1;
285 let origin = &crate_id.data(db).origin;
286 if origin.is_lang() {
287 crate_import_maps_total += 1;
288 import_map_work_sender.send(crate_id).ok();
289 } else if origin.is_local() {
290 let modules = hir::Crate::from(crate_id).modules(db);
300 module_symbols_total += modules.len();
301 for module in modules {
302 symbols_work_sender.send(module).ok();
303 }
304 }
305 }
306 ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
307 ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
308 ParallelPrimeCacheWorkerProgress::EndSema => sema_done += 1,
309 ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
310 std::panic::resume_unwind(Box::new(cancelled));
312 }
313 }
314 }
315}
316
317fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
318 krate
319 .extra_data(db)
320 .display_name
321 .as_deref()
322 .cloned()
323 .unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).index() as usize))
324}