stdx/
process.rs

1//! Read both stdout and stderr of child without deadlocks.
2//!
3//! <https://github.com/rust-lang/cargo/blob/905af549966f23a9288e9993a85d1249a5436556/crates/cargo-util/src/read2.rs>
4//! <https://github.com/rust-lang/cargo/blob/58a961314437258065e23cb6316dfc121d96fb71/crates/cargo-util/src/process_builder.rs#L231>
5
6use std::{
7    io,
8    process::{ChildStderr, ChildStdout, Command, Output, Stdio},
9};
10
11use crate::JodChild;
12
13pub fn streaming_output(
14    out: ChildStdout,
15    err: ChildStderr,
16    on_stdout_line: &mut dyn FnMut(&str),
17    on_stderr_line: &mut dyn FnMut(&str),
18    on_eof: &mut dyn FnMut(),
19) -> io::Result<(Vec<u8>, Vec<u8>)> {
20    let mut stdout = Vec::new();
21    let mut stderr = Vec::new();
22
23    imp::read2(out, err, &mut |is_out, data, eof| {
24        let idx = if eof {
25            data.len()
26        } else {
27            match data.iter().rposition(|&b| b == b'\n') {
28                Some(i) => i + 1,
29                None => return,
30            }
31        };
32        {
33            // scope for new_lines
34            let new_lines = {
35                let dst = if is_out { &mut stdout } else { &mut stderr };
36                let start = dst.len();
37                let data = data.drain(..idx);
38                dst.extend(data);
39                &dst[start..]
40            };
41            for line in String::from_utf8_lossy(new_lines).lines() {
42                if is_out {
43                    on_stdout_line(line);
44                } else {
45                    on_stderr_line(line);
46                }
47            }
48            if eof {
49                on_eof();
50            }
51        }
52    })?;
53
54    Ok((stdout, stderr))
55}
56
57/// # Panics
58///
59/// Panics if `cmd` is not configured to have `stdout` and `stderr` as `piped`.
60pub fn spawn_with_streaming_output(
61    mut cmd: Command,
62    on_stdout_line: &mut dyn FnMut(&str),
63    on_stderr_line: &mut dyn FnMut(&str),
64) -> io::Result<Output> {
65    let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null());
66
67    let mut child = JodChild(cmd.spawn()?);
68    let (stdout, stderr) = streaming_output(
69        child.stdout.take().unwrap(),
70        child.stderr.take().unwrap(),
71        on_stdout_line,
72        on_stderr_line,
73        &mut || (),
74    )?;
75    let status = child.wait()?;
76    Ok(Output { status, stdout, stderr })
77}
78
79#[cfg(unix)]
80mod imp {
81    use std::{
82        io::{self, prelude::*},
83        mem,
84        os::unix::prelude::*,
85        process::{ChildStderr, ChildStdout},
86    };
87
88    pub(crate) fn read2(
89        mut out_pipe: ChildStdout,
90        mut err_pipe: ChildStderr,
91        data: &mut dyn FnMut(bool, &mut Vec<u8>, bool),
92    ) -> io::Result<()> {
93        unsafe {
94            libc::fcntl(out_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
95            libc::fcntl(err_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
96        }
97
98        let mut out_done = false;
99        let mut err_done = false;
100        let mut out = Vec::new();
101        let mut err = Vec::new();
102
103        let mut fds: [libc::pollfd; 2] = unsafe { mem::zeroed() };
104        fds[0].fd = out_pipe.as_raw_fd();
105        fds[0].events = libc::POLLIN;
106        fds[1].fd = err_pipe.as_raw_fd();
107        fds[1].events = libc::POLLIN;
108        let mut nfds = 2;
109        let mut errfd = 1;
110
111        while nfds > 0 {
112            // wait for either pipe to become readable using `select`
113            let r = unsafe { libc::poll(fds.as_mut_ptr(), nfds, -1) };
114            if r == -1 {
115                let err = io::Error::last_os_error();
116                if err.kind() == io::ErrorKind::Interrupted {
117                    continue;
118                }
119                return Err(err);
120            }
121
122            // Read as much as we can from each pipe, ignoring EWOULDBLOCK or
123            // EAGAIN. If we hit EOF, then this will happen because the underlying
124            // reader will return Ok(0), in which case we'll see `Ok` ourselves. In
125            // this case we flip the other fd back into blocking mode and read
126            // whatever's leftover on that file descriptor.
127            let handle = |res: io::Result<_>| match res {
128                Ok(_) => Ok(true),
129                Err(e) => {
130                    if e.kind() == io::ErrorKind::WouldBlock {
131                        Ok(false)
132                    } else {
133                        Err(e)
134                    }
135                }
136            };
137            if !err_done && fds[errfd].revents != 0 && handle(err_pipe.read_to_end(&mut err))? {
138                err_done = true;
139                nfds -= 1;
140            }
141            data(false, &mut err, err_done);
142            if !out_done && fds[0].revents != 0 && handle(out_pipe.read_to_end(&mut out))? {
143                out_done = true;
144                fds[0].fd = err_pipe.as_raw_fd();
145                errfd = 0;
146                nfds -= 1;
147            }
148            data(true, &mut out, out_done);
149        }
150        Ok(())
151    }
152}
153
154#[cfg(windows)]
155mod imp {
156    use std::{
157        io,
158        os::windows::prelude::*,
159        process::{ChildStderr, ChildStdout},
160        slice,
161    };
162
163    use miow::{
164        Overlapped,
165        iocp::{CompletionPort, CompletionStatus},
166        pipe::NamedPipe,
167    };
168    use windows_sys::Win32::Foundation::ERROR_BROKEN_PIPE;
169
170    struct Pipe<'a> {
171        dst: &'a mut Vec<u8>,
172        overlapped: Overlapped,
173        pipe: NamedPipe,
174        done: bool,
175    }
176
177    pub(crate) fn read2(
178        out_pipe: ChildStdout,
179        err_pipe: ChildStderr,
180        data: &mut dyn FnMut(bool, &mut Vec<u8>, bool),
181    ) -> io::Result<()> {
182        let mut out = Vec::new();
183        let mut err = Vec::new();
184
185        let port = CompletionPort::new(1)?;
186        port.add_handle(0, &out_pipe)?;
187        port.add_handle(1, &err_pipe)?;
188
189        unsafe {
190            let mut out_pipe = Pipe::new(out_pipe, &mut out);
191            let mut err_pipe = Pipe::new(err_pipe, &mut err);
192
193            out_pipe.read()?;
194            err_pipe.read()?;
195
196            let mut status = [CompletionStatus::zero(), CompletionStatus::zero()];
197
198            while !out_pipe.done || !err_pipe.done {
199                for status in port.get_many(&mut status, None)? {
200                    if status.token() == 0 {
201                        out_pipe.complete(status);
202                        data(true, out_pipe.dst, out_pipe.done);
203                        out_pipe.read()?;
204                    } else {
205                        err_pipe.complete(status);
206                        data(false, err_pipe.dst, err_pipe.done);
207                        err_pipe.read()?;
208                    }
209                }
210            }
211
212            Ok(())
213        }
214    }
215
216    impl<'a> Pipe<'a> {
217        unsafe fn new<P: IntoRawHandle>(p: P, dst: &'a mut Vec<u8>) -> Pipe<'a> {
218            let pipe = unsafe { NamedPipe::from_raw_handle(p.into_raw_handle()) };
219            Pipe { dst, pipe, overlapped: Overlapped::zero(), done: false }
220        }
221
222        unsafe fn read(&mut self) -> io::Result<()> {
223            let dst = unsafe { slice_to_end(self.dst) };
224            match unsafe { self.pipe.read_overlapped(dst, self.overlapped.raw()) } {
225                Ok(_) => Ok(()),
226                Err(e) => {
227                    if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
228                        self.done = true;
229                        Ok(())
230                    } else {
231                        Err(e)
232                    }
233                }
234            }
235        }
236
237        unsafe fn complete(&mut self, status: &CompletionStatus) {
238            let prev = self.dst.len();
239            unsafe { self.dst.set_len(prev + status.bytes_transferred() as usize) };
240            if status.bytes_transferred() == 0 {
241                self.done = true;
242            }
243        }
244    }
245
246    unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
247        if v.capacity() == 0 {
248            v.reserve(16);
249        }
250        if v.capacity() == v.len() {
251            v.reserve(1);
252        }
253        let data = unsafe { v.as_mut_ptr().add(v.len()) };
254        let len = v.capacity() - v.len();
255        unsafe { slice::from_raw_parts_mut(data, len) }
256    }
257}
258
259#[cfg(target_arch = "wasm32")]
260mod imp {
261    use std::{
262        io,
263        process::{ChildStderr, ChildStdout},
264    };
265
266    pub(crate) fn read2(
267        _out_pipe: ChildStdout,
268        _err_pipe: ChildStderr,
269        _data: &mut dyn FnMut(bool, &mut Vec<u8>, bool),
270    ) -> io::Result<()> {
271        panic!("no processes on wasm")
272    }
273}