1use std::{
11 marker::PhantomData,
12 panic::{self, UnwindSafe},
13 sync::{
14 Arc,
15 atomic::{AtomicUsize, Ordering},
16 },
17};
18
19use crossbeam_channel::{Receiver, Sender};
20use crossbeam_utils::sync::WaitGroup;
21
22use crate::thread::{Builder, JoinHandle, ThreadIntent};
23
24pub struct Pool {
25 job_sender: Sender<Job>,
33 _handles: Box<[JoinHandle]>,
34 extant_tasks: Arc<AtomicUsize>,
35}
36
37struct Job {
38 requested_intent: ThreadIntent,
39 f: Box<dyn FnOnce() + Send + UnwindSafe + 'static>,
40}
41
42impl Pool {
43 #[must_use]
47 pub fn new(threads: usize) -> Self {
48 const STACK_SIZE: usize = 8 * 1024 * 1024;
49 const INITIAL_INTENT: ThreadIntent = ThreadIntent::Worker;
50
51 let (job_sender, job_receiver) = crossbeam_channel::unbounded();
52 let extant_tasks = Arc::new(AtomicUsize::new(0));
53
54 let mut handles = Vec::with_capacity(threads);
55 for idx in 0..threads {
56 let handle = Builder::new(INITIAL_INTENT, format!("Worker{idx}",))
57 .stack_size(STACK_SIZE)
58 .allow_leak(true)
59 .spawn({
60 let extant_tasks = Arc::clone(&extant_tasks);
61 let job_receiver: Receiver<Job> = job_receiver.clone();
62 move || {
63 let mut current_intent = INITIAL_INTENT;
64 for job in job_receiver {
65 if job.requested_intent != current_intent {
66 job.requested_intent.apply_to_current_thread();
67 current_intent = job.requested_intent;
68 }
69 extant_tasks.fetch_add(1, Ordering::SeqCst);
70 drop(panic::catch_unwind(job.f));
72 extant_tasks.fetch_sub(1, Ordering::SeqCst);
73 }
74 }
75 })
76 .expect("failed to spawn thread");
77
78 handles.push(handle);
79 }
80
81 Self { _handles: handles.into_boxed_slice(), extant_tasks, job_sender }
82 }
83
84 pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
85 where
86 F: FnOnce() + Send + UnwindSafe + 'static,
87 {
88 let f = Box::new(move || {
89 if cfg!(debug_assertions) {
90 intent.assert_is_used_on_current_thread();
91 }
92 f();
93 });
94
95 let job = Job { requested_intent: intent, f };
96 self.job_sender.send(job).unwrap();
97 }
98
99 pub fn scoped<'pool, 'scope, F, R>(&'pool self, f: F) -> R
100 where
101 F: FnOnce(&Scope<'pool, 'scope>) -> R,
102 {
103 let wg = WaitGroup::new();
104 let scope = Scope { pool: self, wg, _marker: PhantomData };
105 let r = f(&scope);
106 scope.wg.wait();
107 r
108 }
109
110 #[must_use]
111 pub fn len(&self) -> usize {
112 self.extant_tasks.load(Ordering::SeqCst)
113 }
114
115 #[must_use]
116 pub fn is_empty(&self) -> bool {
117 self.len() == 0
118 }
119}
120
121pub struct Scope<'pool, 'scope> {
122 pool: &'pool Pool,
123 wg: WaitGroup,
124 _marker: PhantomData<fn(&'scope ()) -> &'scope ()>,
125}
126
127impl<'scope> Scope<'_, 'scope> {
128 pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
129 where
130 F: 'scope + FnOnce() + Send + UnwindSafe,
131 {
132 let wg = self.wg.clone();
133 let f = Box::new(move || {
134 if cfg!(debug_assertions) {
135 intent.assert_is_used_on_current_thread();
136 }
137 f();
138 drop(wg);
139 });
140
141 let job = Job {
142 requested_intent: intent,
143 f: unsafe {
144 std::mem::transmute::<
145 Box<dyn 'scope + FnOnce() + Send + UnwindSafe>,
146 Box<dyn 'static + FnOnce() + Send + UnwindSafe>,
147 >(f)
148 },
149 };
150 self.pool.job_sender.send(job).unwrap();
151 }
152}