1use std::panic::AssertUnwindSafe;
6
7use base_db::all_crates;
8use hir::{Symbol, import_map::ImportMap};
9use rustc_hash::FxHashMap;
10use salsa::{Cancelled, Database};
11
12use crate::{FxIndexMap, RootDatabase, base_db::Crate, symbol_index::SymbolIndex};
13
14#[derive(Debug)]
16pub struct ParallelPrimeCachesProgress {
17 pub crates_currently_indexing: Vec<Symbol>,
19 pub crates_total: usize,
21 pub crates_done: usize,
23 pub work_type: &'static str,
24}
25
26pub fn parallel_prime_caches(
27 db: &RootDatabase,
28 num_worker_threads: usize,
29 cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync),
30) {
31 let _p = tracing::info_span!("parallel_prime_caches").entered();
32
33 enum ParallelPrimeCacheWorkerProgress {
34 BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
35 EndCrateDefMap { crate_id: Crate },
36 EndCrateImportMap,
37 EndSema,
38 EndModuleSymbols,
39 Cancelled(Cancelled),
40 }
41
42 let (reverse_deps, mut to_be_done_deps) = {
56 let all_crates = all_crates(db);
57 let to_be_done_deps = all_crates
58 .iter()
59 .map(|&krate| (krate, krate.data(db).dependencies.len() as u32))
60 .collect::<FxHashMap<_, _>>();
61 let mut reverse_deps =
62 all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
63 for &krate in &*all_crates {
64 for dep in &krate.data(db).dependencies {
65 reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
66 }
67 }
68 (reverse_deps, to_be_done_deps)
69 };
70
71 let (
72 def_map_work_sender,
73 import_map_work_sender,
74 symbols_work_sender,
75 sema_work_sender,
76 progress_receiver,
77 ) = {
78 let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
79 let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
80 let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
81 let (sema_work_sender, sema_work_receiver) = crossbeam_channel::unbounded();
82 let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
83 let prime_caches_worker = move |db: RootDatabase| {
84 let handle_def_map = |crate_id, crate_name| {
85 progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
86 crate_id,
87 crate_name,
88 })?;
89
90 let cancelled = Cancelled::catch(|| {
91 _ = hir::crate_def_map(&db, crate_id);
92 });
93
94 match cancelled {
95 Ok(()) => progress_sender
96 .send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
97 Err(cancelled) => progress_sender
98 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
99 }
100
101 Ok::<_, crossbeam_channel::SendError<_>>(())
102 };
103 let handle_sema = |crate_id| {
104 let cancelled = Cancelled::catch(|| {
105 hir::attach_db(&db, || {
106 _ = hir::TraitImpls::for_crate(&db, crate_id);
111 })
112 });
113
114 match cancelled {
115 Ok(()) => progress_sender.send(ParallelPrimeCacheWorkerProgress::EndSema)?,
116 Err(cancelled) => progress_sender
117 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
118 }
119
120 Ok::<_, crossbeam_channel::SendError<_>>(())
121 };
122 let handle_import_map = |crate_id| {
123 let cancelled = Cancelled::catch(|| _ = ImportMap::of(&db, crate_id));
124
125 match cancelled {
126 Ok(()) => {
127 progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?
128 }
129 Err(cancelled) => progress_sender
130 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
131 }
132
133 Ok::<_, crossbeam_channel::SendError<_>>(())
134 };
135 let handle_symbols = |module: hir::Module| {
136 let cancelled = Cancelled::catch(AssertUnwindSafe(|| {
137 _ = SymbolIndex::module_symbols(&db, module)
138 }));
139
140 match cancelled {
141 Ok(()) => {
142 progress_sender.send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?
143 }
144 Err(cancelled) => progress_sender
145 .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
146 }
147
148 Ok::<_, crossbeam_channel::SendError<_>>(())
149 };
150
151 loop {
152 db.unwind_if_revision_cancelled();
153
154 crossbeam_channel::select_biased! {
156 recv(def_map_work_receiver) -> work => {
157 let Ok((crate_id, crate_name)) = work else { break };
158 handle_def_map(crate_id, crate_name)?;
159 }
160 recv(sema_work_receiver) -> work => {
161 let Ok(crate_id) = work else { break };
162 handle_sema(crate_id)?;
163 }
164 recv(import_map_work_receiver) -> work => {
165 let Ok(crate_id) = work else { break };
166 handle_import_map(crate_id)?;
167 }
168 recv(symbols_work_receiver) -> work => {
169 let Ok(module) = work else { break };
170 handle_symbols(module)?;
171 }
172 }
173 }
174 Ok::<_, crossbeam_channel::SendError<_>>(())
175 };
176
177 for id in 0..num_worker_threads {
178 stdx::thread::Builder::new(
179 stdx::thread::ThreadIntent::Worker,
180 format!("PrimeCaches#{id}"),
181 )
182 .allow_leak(true)
183 .spawn({
184 let worker = prime_caches_worker.clone();
185 let db = db.clone();
186 move || worker(db)
187 })
188 .expect("failed to spawn thread");
189 }
190
191 (
192 def_map_work_sender,
193 import_map_work_sender,
194 symbols_work_sender,
195 sema_work_sender,
196 progress_receiver,
197 )
198 };
199
200 let crate_def_maps_total = all_crates(db).len();
201 let mut crate_def_maps_done = 0;
202 let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
203 let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
204 let (mut sema_total, mut sema_done) = (0usize, 0usize);
205
206 let mut crates_currently_indexing =
209 FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
210
211 for (&krate, &to_be_done_deps) in &to_be_done_deps {
212 if to_be_done_deps != 0 {
213 continue;
214 }
215
216 let name = crate_name(db, krate);
217 def_map_work_sender.send((krate, name)).ok();
218 }
219
220 while crate_def_maps_done < crate_def_maps_total
221 || crate_import_maps_done < crate_import_maps_total
222 || module_symbols_done < module_symbols_total
223 || sema_done < sema_total
224 {
225 db.unwind_if_revision_cancelled();
226
227 let progress = ParallelPrimeCachesProgress {
228 crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
229 crates_done: crate_def_maps_done,
230 crates_total: crate_def_maps_total,
231 work_type: "Indexing",
232 };
233
234 cb(progress);
235
236 let progress = match progress_receiver.recv() {
238 Ok(p) => p,
239 Err(crossbeam_channel::RecvError) => {
240 cb(ParallelPrimeCachesProgress {
242 crates_currently_indexing: vec![],
243 crates_done: crate_def_maps_done,
244 crates_total: crate_def_maps_done,
245 work_type: "Done",
246 });
247 return;
248 }
249 };
250
251 match progress {
252 ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
253 crates_currently_indexing.insert(crate_id, crate_name);
254 }
255 ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
256 crates_currently_indexing.swap_remove(&crate_id);
257 crate_def_maps_done += 1;
258
259 for &dep in &reverse_deps[&crate_id] {
261 let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
262 *to_be_done -= 1;
263 if *to_be_done == 0 {
264 let dep_name = crate_name(db, dep);
265 def_map_work_sender.send((dep, dep_name)).ok();
266 }
267 }
268
269 if crate_def_maps_done == crate_def_maps_total {
270 cb(ParallelPrimeCachesProgress {
271 crates_currently_indexing: vec![],
272 crates_done: crate_def_maps_done,
273 crates_total: crate_def_maps_done,
274 work_type: "Collecting Symbols",
275 });
276 }
277
278 sema_work_sender.send(crate_id).ok();
279 sema_total += 1;
280 let origin = &crate_id.data(db).origin;
281 if origin.is_lang() {
282 crate_import_maps_total += 1;
283 import_map_work_sender.send(crate_id).ok();
284 } else if origin.is_local() {
285 let modules = hir::Crate::from(crate_id).modules(db);
295 module_symbols_total += modules.len();
296 for module in modules {
297 symbols_work_sender.send(module).ok();
298 }
299 }
300 }
301 ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
302 ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
303 ParallelPrimeCacheWorkerProgress::EndSema => sema_done += 1,
304 ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
305 std::panic::resume_unwind(Box::new(cancelled));
307 }
308 }
309 }
310}
311
312fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
313 krate
314 .extra_data(db)
315 .display_name
316 .as_deref()
317 .cloned()
318 .unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).index() as usize))
319}