stdx/thread/
pool.rs

1//! [`Pool`] implements a basic custom thread pool
2//! inspired by the [`threadpool` crate](http://docs.rs/threadpool).
3//! When you spawn a task you specify a thread intent
4//! so the pool can schedule it to run on a thread with that intent.
5//! rust-analyzer uses this to prioritize work based on latency requirements.
6//!
7//! The thread pool is implemented entirely using
8//! the threading utilities in [`crate::thread`].
9
10use std::{
11    marker::PhantomData,
12    panic::{self, UnwindSafe},
13    sync::{
14        Arc,
15        atomic::{AtomicUsize, Ordering},
16    },
17};
18
19use crossbeam_channel::{Receiver, Sender};
20use crossbeam_utils::sync::WaitGroup;
21
22use crate::thread::{Builder, JoinHandle, ThreadIntent};
23
24pub struct Pool {
25    // `_handles` is never read: the field is present
26    // only for its `Drop` impl.
27
28    // The worker threads exit once the channel closes;
29    // make sure to keep `job_sender` above `handles`
30    // so that the channel is actually closed
31    // before we join the worker threads!
32    job_sender: Sender<Job>,
33    _handles: Box<[JoinHandle]>,
34    extant_tasks: Arc<AtomicUsize>,
35}
36
37struct Job {
38    requested_intent: ThreadIntent,
39    f: Box<dyn FnOnce() + Send + UnwindSafe + 'static>,
40}
41
42impl Pool {
43    /// # Panics
44    ///
45    /// Panics if job panics
46    #[must_use]
47    pub fn new(threads: usize) -> Self {
48        const STACK_SIZE: usize = 8 * 1024 * 1024;
49        const INITIAL_INTENT: ThreadIntent = ThreadIntent::Worker;
50
51        let (job_sender, job_receiver) = crossbeam_channel::unbounded();
52        let extant_tasks = Arc::new(AtomicUsize::new(0));
53
54        let mut handles = Vec::with_capacity(threads);
55        for idx in 0..threads {
56            let handle = Builder::new(INITIAL_INTENT, format!("Worker{idx}",))
57                .stack_size(STACK_SIZE)
58                .allow_leak(true)
59                .spawn({
60                    let extant_tasks = Arc::clone(&extant_tasks);
61                    let job_receiver: Receiver<Job> = job_receiver.clone();
62                    move || {
63                        let mut current_intent = INITIAL_INTENT;
64                        for job in job_receiver {
65                            if job.requested_intent != current_intent {
66                                job.requested_intent.apply_to_current_thread();
67                                current_intent = job.requested_intent;
68                            }
69                            extant_tasks.fetch_add(1, Ordering::SeqCst);
70                            // discard the panic, we should've logged the backtrace already
71                            drop(panic::catch_unwind(job.f));
72                            extant_tasks.fetch_sub(1, Ordering::SeqCst);
73                        }
74                    }
75                })
76                .expect("failed to spawn thread");
77
78            handles.push(handle);
79        }
80
81        Self { _handles: handles.into_boxed_slice(), extant_tasks, job_sender }
82    }
83
84    pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
85    where
86        F: FnOnce() + Send + UnwindSafe + 'static,
87    {
88        let f = Box::new(move || {
89            if cfg!(debug_assertions) {
90                intent.assert_is_used_on_current_thread();
91            }
92            f();
93        });
94
95        let job = Job { requested_intent: intent, f };
96        self.job_sender.send(job).unwrap();
97    }
98
99    pub fn scoped<'pool, 'scope, F, R>(&'pool self, f: F) -> R
100    where
101        F: FnOnce(&Scope<'pool, 'scope>) -> R,
102    {
103        let wg = WaitGroup::new();
104        let scope = Scope { pool: self, wg, _marker: PhantomData };
105        let r = f(&scope);
106        scope.wg.wait();
107        r
108    }
109
110    #[must_use]
111    pub fn len(&self) -> usize {
112        self.extant_tasks.load(Ordering::SeqCst)
113    }
114
115    #[must_use]
116    pub fn is_empty(&self) -> bool {
117        self.len() == 0
118    }
119}
120
121pub struct Scope<'pool, 'scope> {
122    pool: &'pool Pool,
123    wg: WaitGroup,
124    _marker: PhantomData<fn(&'scope ()) -> &'scope ()>,
125}
126
127impl<'scope> Scope<'_, 'scope> {
128    pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
129    where
130        F: 'scope + FnOnce() + Send + UnwindSafe,
131    {
132        let wg = self.wg.clone();
133        let f = Box::new(move || {
134            if cfg!(debug_assertions) {
135                intent.assert_is_used_on_current_thread();
136            }
137            f();
138            drop(wg);
139        });
140
141        let job = Job {
142            requested_intent: intent,
143            f: unsafe {
144                std::mem::transmute::<
145                    Box<dyn 'scope + FnOnce() + Send + UnwindSafe>,
146                    Box<dyn 'static + FnOnce() + Send + UnwindSafe>,
147                >(f)
148            },
149        };
150        self.pool.job_sender.send(job).unwrap();
151    }
152}