1use std::{
5 ffi::OsString,
6 fmt,
7 io::{self, BufWriter, Write},
8 marker::PhantomData,
9 path::PathBuf,
10 process::{ChildStderr, ChildStdout, Command, Stdio},
11};
12
13use crossbeam_channel::Sender;
14use paths::Utf8PathBuf;
15use process_wrap::std::{StdChildWrapper, StdCommandWrap};
16use stdx::process::streaming_output;
17
18pub(crate) trait JsonLinesParser<T>: Send + 'static {
24 fn from_line(&self, line: &str, error: &mut String) -> Option<T>;
25 fn from_eof(&self) -> Option<T>;
26}
27
28struct CommandActor<T> {
29 parser: Box<dyn JsonLinesParser<T>>,
30 sender: Sender<T>,
31 stdout: ChildStdout,
32 stderr: ChildStderr,
33}
34
35impl<T: Sized + Send + 'static> CommandActor<T> {
36 fn new(
37 parser: impl JsonLinesParser<T>,
38 sender: Sender<T>,
39 stdout: ChildStdout,
40 stderr: ChildStderr,
41 ) -> Self {
42 let parser = Box::new(parser);
43 CommandActor { parser, sender, stdout, stderr }
44 }
45}
46
47impl<T: Sized + Send + 'static> CommandActor<T> {
48 fn run(self, outfile: Option<Utf8PathBuf>) -> io::Result<(bool, String)> {
49 let mut stdout = outfile.as_ref().and_then(|path| {
59 _ = std::fs::create_dir_all(path);
60 Some(BufWriter::new(std::fs::File::create(path.join("stdout")).ok()?))
61 });
62 let mut stderr = outfile.as_ref().and_then(|path| {
63 _ = std::fs::create_dir_all(path);
64 Some(BufWriter::new(std::fs::File::create(path.join("stderr")).ok()?))
65 });
66
67 let mut stdout_errors = String::new();
68 let mut stderr_errors = String::new();
69 let mut read_at_least_one_stdout_message = false;
70 let mut read_at_least_one_stderr_message = false;
71 let process_line = |line: &str, error: &mut String| {
72 if let Some(t) = self.parser.from_line(line, error) {
74 self.sender.send(t).unwrap();
75 true
76 } else {
77 false
78 }
79 };
80 let output = streaming_output(
81 self.stdout,
82 self.stderr,
83 &mut |line| {
84 if let Some(stdout) = &mut stdout {
85 _ = stdout.write_all(line.as_bytes());
86 _ = stdout.write_all(b"\n");
87 }
88 if process_line(line, &mut stdout_errors) {
89 read_at_least_one_stdout_message = true;
90 }
91 },
92 &mut |line| {
93 if let Some(stderr) = &mut stderr {
94 _ = stderr.write_all(line.as_bytes());
95 _ = stderr.write_all(b"\n");
96 }
97 if process_line(line, &mut stderr_errors) {
98 read_at_least_one_stderr_message = true;
99 }
100 },
101 &mut || {
102 if let Some(t) = self.parser.from_eof() {
103 self.sender.send(t).unwrap();
104 }
105 },
106 );
107
108 let read_at_least_one_message =
109 read_at_least_one_stdout_message || read_at_least_one_stderr_message;
110 let mut error = stdout_errors;
111 error.push_str(&stderr_errors);
112 match output {
113 Ok(_) => Ok((read_at_least_one_message, error)),
114 Err(e) => Err(io::Error::new(e.kind(), format!("{e:?}: {error}"))),
115 }
116 }
117}
118
119struct JodGroupChild(Box<dyn StdChildWrapper>);
123
124impl Drop for JodGroupChild {
125 fn drop(&mut self) {
126 _ = self.0.kill();
127 _ = self.0.wait();
128 }
129}
130
131pub(crate) struct CommandHandle<T> {
133 child: JodGroupChild,
136 thread: stdx::thread::JoinHandle<io::Result<(bool, String)>>,
137 program: OsString,
138 arguments: Vec<OsString>,
139 current_dir: Option<PathBuf>,
140 _phantom: PhantomData<T>,
141}
142
143impl<T> fmt::Debug for CommandHandle<T> {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 f.debug_struct("CommandHandle")
146 .field("program", &self.program)
147 .field("arguments", &self.arguments)
148 .field("current_dir", &self.current_dir)
149 .finish()
150 }
151}
152
153impl<T: Sized + Send + 'static> CommandHandle<T> {
154 pub(crate) fn spawn(
155 mut command: Command,
156 parser: impl JsonLinesParser<T>,
157 sender: Sender<T>,
158 out_file: Option<Utf8PathBuf>,
159 ) -> std::io::Result<Self> {
160 command.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null());
161
162 let program = command.get_program().into();
163 let arguments = command.get_args().map(|arg| arg.into()).collect::<Vec<OsString>>();
164 let current_dir = command.get_current_dir().map(|arg| arg.to_path_buf());
165
166 let mut child = StdCommandWrap::from(command);
167 #[cfg(unix)]
168 child.wrap(process_wrap::std::ProcessSession);
169 #[cfg(windows)]
170 child.wrap(process_wrap::std::JobObject);
171 let mut child = child.spawn().map(JodGroupChild)?;
172
173 let stdout = child.0.stdout().take().unwrap();
174 let stderr = child.0.stderr().take().unwrap();
175
176 let actor = CommandActor::<T>::new(parser, sender, stdout, stderr);
177 let thread =
178 stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker, "CommandHandle")
179 .spawn(move || actor.run(out_file))
180 .expect("failed to spawn thread");
181 Ok(CommandHandle { program, arguments, current_dir, child, thread, _phantom: PhantomData })
182 }
183
184 pub(crate) fn cancel(mut self) {
185 let _ = self.child.0.kill();
186 let _ = self.child.0.wait();
187 }
188
189 pub(crate) fn join(mut self) -> io::Result<()> {
190 let exit_status = self.child.0.wait()?;
191 let (read_at_least_one_message, error) = self.thread.join()?;
192 if read_at_least_one_message || exit_status.success() {
193 Ok(())
194 } else {
195 Err(io::Error::other(format!(
196 "Cargo watcher failed, the command produced no valid metadata (exit code: {exit_status:?}):\n{error}"
197 )))
198 }
199 }
200
201 pub(crate) fn has_exited(&mut self) -> bool {
202 match self.child.0.try_wait() {
203 Ok(Some(_exit_code)) => {
204 true
206 }
207 Ok(None) => {
208 false
210 }
211 Err(_) => {
212 true
215 }
216 }
217 }
218}