diff --git a/Cargo.toml b/Cargo.toml
index 802c9905e..e16186069 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,10 +22,10 @@ rust-version = "1.53"
 [target.'cfg(unix)'.dependencies]
 # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
 # which is still an issue with `resolver = "1"`.
-libc = { version = "0.2.62", default-features = false }
+libc = { version = "0.2.62", default-features = false, optional = true }
 
 [features]
-parallel = []
+parallel = ["libc"]
 
 [dev-dependencies]
 tempfile = "3"
diff --git a/dev-tools/gen-windows-sys-binding/windows_sys.list b/dev-tools/gen-windows-sys-binding/windows_sys.list
index f5ed55e16..ec8b02793 100644
--- a/dev-tools/gen-windows-sys-binding/windows_sys.list
+++ b/dev-tools/gen-windows-sys-binding/windows_sys.list
@@ -1,5 +1,4 @@
 Windows.Win32.Foundation.FILETIME
-Windows.Win32.Foundation.INVALID_HANDLE_VALUE
 Windows.Win32.Foundation.ERROR_NO_MORE_ITEMS
 Windows.Win32.Foundation.ERROR_SUCCESS
 Windows.Win32.Foundation.SysFreeString
@@ -20,7 +19,7 @@ Windows.Win32.System.Com.COINIT_MULTITHREADED
 Windows.Win32.System.Com.CoCreateInstance
 Windows.Win32.System.Com.CoInitializeEx
 
-Windows.Win32.System.Pipes.CreatePipe
+Windows.Win32.System.Pipes.PeekNamedPipe
 
 Windows.Win32.System.Registry.RegCloseKey
 Windows.Win32.System.Registry.RegEnumKeyExW
diff --git a/src/command_helpers.rs b/src/command_helpers.rs
index 3b3e923df..a818b0493 100644
--- a/src/command_helpers.rs
+++ b/src/command_helpers.rs
@@ -4,13 +4,12 @@ use std::{
     collections::hash_map,
     ffi::OsString,
     fmt::Display,
-    fs::{self, File},
+    fs,
     hash::Hasher,
-    io::{self, BufRead, BufReader, Read, Write},
+    io::{self, Read, Write},
     path::Path,
-    process::{Child, Command, Stdio},
+    process::{Child, ChildStderr, Command, Stdio},
     sync::Arc,
-    thread::{self, JoinHandle},
 };
 
 use crate::{Error, ErrorKind, Object};
@@ -41,83 +40,157 @@ impl CargoOutput {
         }
     }
 
-    pub(crate) fn print_thread(&self) -> Result<Option<PrintThread>, Error> {
-        self.warnings.then(PrintThread::new).transpose()
+    fn stdio_for_warnings(&self) -> Stdio {
+        if self.warnings {
+            Stdio::piped()
+        } else {
+            Stdio::null()
+        }
     }
 }
 
-pub(crate) struct PrintThread {
-    handle: Option<JoinHandle<()>>,
-    pipe_writer: Option<File>,
+pub(crate) struct StderrForwarder {
+    inner: Option<(ChildStderr, Vec<u8>)>,
+    #[cfg(feature = "parallel")]
+    is_non_blocking: bool,
+    #[cfg(feature = "parallel")]
+    bytes_available_failed: bool,
 }
 
-impl PrintThread {
-    pub(crate) fn new() -> Result<Self, Error> {
-        let (pipe_reader, pipe_writer) = crate::os_pipe::pipe()?;
-
-        // Capture the standard error coming from compilation, and write it out
-        // with cargo:warning= prefixes. Note that this is a bit wonky to avoid
-        // requiring the output to be UTF-8, we instead just ship bytes from one
-        // location to another.
-        let print = thread::spawn(move || {
-            let mut stderr = BufReader::with_capacity(4096, pipe_reader);
-            let mut line = Vec::with_capacity(20);
-            let stdout = io::stdout();
-
-            // read_until returns 0 on Eof
-            while stderr.read_until(b'\n', &mut line).unwrap() != 0 {
-                {
-                    let mut stdout = stdout.lock();
-
-                    stdout.write_all(b"cargo:warning=").unwrap();
-                    stdout.write_all(&line).unwrap();
-                    stdout.write_all(b"\n").unwrap();
-                }
+const MIN_BUFFER_CAPACITY: usize = 100;
 
-                // read_until does not clear the buffer
-                line.clear();
-            }
-        });
+impl StderrForwarder {
+    pub(crate) fn new(child: &mut Child) -> Self {
+        Self {
+            inner: child
+                .stderr
+                .take()
+                .map(|stderr| (stderr, Vec::with_capacity(MIN_BUFFER_CAPACITY))),
+            #[cfg(feature = "parallel")]
+            is_non_blocking: false,
+            #[cfg(feature = "parallel")]
+            bytes_available_failed: false,
+        }
+    }
 
-        Ok(Self {
-            handle: Some(print),
-            pipe_writer: Some(pipe_writer),
-        })
+    #[allow(clippy::uninit_vec)]
+    fn forward_available(&mut self) -> bool {
+        if let Some((stderr, buffer)) = self.inner.as_mut() {
+            loop {
+                let old_data_end = buffer.len();
+
+                // For non-blocking we check to see if there is data available, so we should try to
+                // read at least that much. For blocking, always read at least the minimum amount.
+                #[cfg(not(feature = "parallel"))]
+                let to_reserve = MIN_BUFFER_CAPACITY;
+                #[cfg(feature = "parallel")]
+                let to_reserve = if self.is_non_blocking && !self.bytes_available_failed {
+                    match crate::parallel::stderr::bytes_available(stderr) {
+                        #[cfg(windows)]
+                        Ok(0) => return false,
+                        #[cfg(windows)]
+                        Err(_) => {
+                            // On Windows, if we get an error then the pipe is broken, so flush
+                            // the buffer and bail.
+                            if !buffer.is_empty() {
+                                write_warning(&buffer[..]);
+                            }
+                            self.inner = None;
+                            return true;
+                        }
+                        #[cfg(unix)]
+                        Err(_) | Ok(0) => {
+                            // On Unix, depending on the implementation, we may get spurious
+                            // errors so make a note not to use bytes_available again and try
+                            // the non-blocking read anyway.
+                            self.bytes_available_failed = true;
+                            MIN_BUFFER_CAPACITY
+                        }
+                        Ok(bytes_available) => MIN_BUFFER_CAPACITY.max(bytes_available),
+                    }
+                } else {
+                    MIN_BUFFER_CAPACITY
+                };
+                buffer.reserve(to_reserve);
+
+                // SAFETY: 1) the length is set to the capacity, so we are never using memory beyond
+                // the underlying buffer and 2) we always call `truncate` below to set the len back
+                // to the intitialized data.
+                unsafe {
+                    buffer.set_len(buffer.capacity());
+                }
+                match stderr.read(&mut buffer[old_data_end..]) {
+                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
+                        // No data currently, yield back.
+                        buffer.truncate(old_data_end);
+                        return false;
+                    }
+                    Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {
+                        // Interrupted, try again.
+                        buffer.truncate(old_data_end);
+                    }
+                    Ok(0) | Err(_) => {
+                        // End of stream: flush remaining data and bail.
+                        if old_data_end > 0 {
+                            write_warning(&buffer[..old_data_end]);
+                        }
+                        self.inner = None;
+                        return true;
+                    }
+                    Ok(bytes_read) => {
+                        buffer.truncate(old_data_end + bytes_read);
+                        let mut consumed = 0;
+                        for line in buffer.split_inclusive(|&b| b == b'\n') {
+                            // Only forward complete lines, leave the rest in the buffer.
+                            if let Some((b'\n', line)) = line.split_last() {
+                                consumed += line.len() + 1;
+                                write_warning(line);
+                            }
+                        }
+                        buffer.drain(..consumed);
+                    }
+                }
+            }
+        } else {
+            true
+        }
     }
 
-    /// # Panics
-    ///
-    /// Will panic if the pipe writer has already been taken.
-    pub(crate) fn take_pipe_writer(&mut self) -> File {
-        self.pipe_writer.take().unwrap()
+    #[cfg(feature = "parallel")]
+    pub(crate) fn set_non_blocking(&mut self) -> Result<(), Error> {
+        assert!(!self.is_non_blocking);
+
+        if let Some((stderr, _)) = self.inner.as_mut() {
+            crate::parallel::stderr::set_non_blocking(stderr)?;
+        }
+
+        self.is_non_blocking = true;
+        Ok(())
     }
 
-    /// # Panics
-    ///
-    /// Will panic if the pipe writer has already been taken.
-    pub(crate) fn clone_pipe_writer(&self) -> Result<File, Error> {
-        self.try_clone_pipe_writer().map(Option::unwrap)
+    #[cfg(feature = "parallel")]
+    fn forward_all(&mut self) {
+        while !self.forward_available() {}
     }
 
-    pub(crate) fn try_clone_pipe_writer(&self) -> Result<Option<File>, Error> {
-        self.pipe_writer
-            .as_ref()
-            .map(File::try_clone)
-            .transpose()
-            .map_err(From::from)
+    #[cfg(not(feature = "parallel"))]
+    fn forward_all(&mut self) {
+        let forward_result = self.forward_available();
+        assert!(forward_result, "Should have consumed all data");
     }
 }
 
-impl Drop for PrintThread {
-    fn drop(&mut self) {
-        // Drop pipe_writer first to avoid deadlock
-        self.pipe_writer.take();
-
-        self.handle.take().unwrap().join().unwrap();
-    }
+fn write_warning(line: &[u8]) {
+    let stdout = io::stdout();
+    let mut stdout = stdout.lock();
+    stdout.write_all(b"cargo:warning=").unwrap();
+    stdout.write_all(line).unwrap();
+    stdout.write_all(b"\n").unwrap();
 }
 
 fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
+    StderrForwarder::new(child).forward_all();
+
     let status = match child.wait() {
         Ok(s) => s,
         Err(e) => {
@@ -193,20 +266,13 @@ pub(crate) fn objects_from_files(files: &[Arc<Path>], dst: &Path) -> Result<Vec<
     Ok(objects)
 }
 
-fn run_inner(cmd: &mut Command, program: &str, pipe_writer: Option<File>) -> Result<(), Error> {
-    let mut child = spawn(cmd, program, pipe_writer)?;
-    wait_on_child(cmd, program, &mut child)
-}
-
 pub(crate) fn run(
     cmd: &mut Command,
     program: &str,
-    print: Option<&PrintThread>,
+    cargo_output: &CargoOutput,
 ) -> Result<(), Error> {
-    let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?;
-    run_inner(cmd, program, pipe_writer)?;
-
-    Ok(())
+    let mut child = spawn(cmd, program, cargo_output)?;
+    wait_on_child(cmd, program, &mut child)
 }
 
 pub(crate) fn run_output(
@@ -216,12 +282,7 @@ pub(crate) fn run_output(
 ) -> Result<Vec<u8>, Error> {
     cmd.stdout(Stdio::piped());
 
-    let mut print = cargo_output.print_thread()?;
-    let mut child = spawn(
-        cmd,
-        program,
-        print.as_mut().map(PrintThread::take_pipe_writer),
-    )?;
+    let mut child = spawn(cmd, program, cargo_output)?;
 
     let mut stdout = vec![];
     child
@@ -239,7 +300,7 @@ pub(crate) fn run_output(
 pub(crate) fn spawn(
     cmd: &mut Command,
     program: &str,
-    pipe_writer: Option<File>,
+    cargo_output: &CargoOutput,
 ) -> Result<Child, Error> {
     struct ResetStderr<'cmd>(&'cmd mut Command);
 
@@ -254,10 +315,7 @@ pub(crate) fn spawn(
     println!("running: {:?}", cmd);
 
     let cmd = ResetStderr(cmd);
-    let child = cmd
-        .0
-        .stderr(pipe_writer.map_or_else(Stdio::null, Stdio::from))
-        .spawn();
+    let child = cmd.0.stderr(cargo_output.stdio_for_warnings()).spawn();
     match child {
         Ok(child) => Ok(child),
         Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
@@ -307,9 +365,14 @@ pub(crate) fn try_wait_on_child(
     program: &str,
     child: &mut Child,
     stdout: &mut dyn io::Write,
+    stderr_forwarder: &mut StderrForwarder,
 ) -> Result<Option<()>, Error> {
+    stderr_forwarder.forward_available();
+
     match child.try_wait() {
         Ok(Some(status)) => {
+            stderr_forwarder.forward_all();
+
             let _ = writeln!(stdout, "{}", status);
 
             if status.success() {
@@ -325,12 +388,15 @@ pub(crate) fn try_wait_on_child(
             }
         }
         Ok(None) => Ok(None),
-        Err(e) => Err(Error::new(
-            ErrorKind::ToolExecError,
-            format!(
-                "Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
-                cmd, program, e
-            ),
-        )),
+        Err(e) => {
+            stderr_forwarder.forward_all();
+            Err(Error::new(
+                ErrorKind::ToolExecError,
+                format!(
+                    "Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
+                    cmd, program, e
+                ),
+            ))
+        }
     }
 }
diff --git a/src/lib.rs b/src/lib.rs
index c3237dd4b..a76d5321a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -66,7 +66,6 @@ use std::process::Child;
 use std::process::Command;
 use std::sync::{Arc, Mutex};
 
-mod os_pipe;
 #[cfg(feature = "parallel")]
 mod parallel;
 mod windows;
@@ -1042,10 +1041,9 @@ impl Build {
         let dst = self.get_out_dir()?;
 
         let objects = objects_from_files(&self.files, &dst)?;
-        let print = self.cargo_output.print_thread()?;
 
-        self.compile_objects(&objects, print.as_ref())?;
-        self.assemble(lib_name, &dst.join(gnu_lib_name), &objects, print.as_ref())?;
+        self.compile_objects(&objects)?;
+        self.assemble(lib_name, &dst.join(gnu_lib_name), &objects)?;
 
         if self.get_target()?.contains("msvc") {
             let compiler = self.get_base_compiler()?;
@@ -1207,15 +1205,14 @@ impl Build {
     pub fn try_compile_intermediates(&self) -> Result<Vec<PathBuf>, Error> {
         let dst = self.get_out_dir()?;
         let objects = objects_from_files(&self.files, &dst)?;
-        let print = self.cargo_output.print_thread()?;
 
-        self.compile_objects(&objects, print.as_ref())?;
+        self.compile_objects(&objects)?;
 
         Ok(objects.into_iter().map(|v| v.dst).collect())
     }
 
     #[cfg(feature = "parallel")]
-    fn compile_objects(&self, objs: &[Object], print: Option<&PrintThread>) -> Result<(), Error> {
+    fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
         use std::cell::Cell;
 
         use parallel::async_executor::{block_on, YieldOnce};
@@ -1223,7 +1220,7 @@ impl Build {
         if objs.len() <= 1 {
             for obj in objs {
                 let (mut cmd, name) = self.create_compile_object_cmd(obj)?;
-                run(&mut cmd, &name, print)?;
+                run(&mut cmd, &name, &self.cargo_output)?;
             }
 
             return Ok(());
@@ -1280,7 +1277,13 @@ impl Build {
                     parallel::retain_unordered_mut(
                         &mut pendings,
                         |(cmd, program, child, _token)| {
-                            match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
+                            match try_wait_on_child(
+                                cmd,
+                                program,
+                                &mut child.0,
+                                &mut stdout,
+                                &mut child.1,
+                            ) {
                                 Ok(Some(())) => {
                                     // Task done, remove the entry
                                     has_made_progress.set(true);
@@ -1328,11 +1331,12 @@ impl Build {
                         YieldOnce::default().await
                     }
                 };
-                let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?;
-                let child = spawn(&mut cmd, &program, pipe_writer)?;
+                let mut child = spawn(&mut cmd, &program, &self.cargo_output)?;
+                let mut stderr_forwarder = StderrForwarder::new(&mut child);
+                stderr_forwarder.set_non_blocking()?;
 
                 cell_update(&pendings, |mut pendings| {
-                    pendings.push((cmd, program, KillOnDrop(child), token));
+                    pendings.push((cmd, program, KillOnDrop(child, stderr_forwarder), token));
                     pendings
                 });
 
@@ -1345,7 +1349,7 @@ impl Build {
 
         return block_on(wait_future, spawn_future, &has_made_progress);
 
-        struct KillOnDrop(Child);
+        struct KillOnDrop(Child, StderrForwarder);
 
         impl Drop for KillOnDrop {
             fn drop(&mut self) {
@@ -1367,10 +1371,10 @@ impl Build {
     }
 
     #[cfg(not(feature = "parallel"))]
-    fn compile_objects(&self, objs: &[Object], print: Option<&PrintThread>) -> Result<(), Error> {
+    fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
         for obj in objs {
             let (mut cmd, name) = self.create_compile_object_cmd(obj)?;
-            run(&mut cmd, &name, print)?;
+            run(&mut cmd, &name, &self.cargo_output)?;
         }
 
         Ok(())
@@ -2145,13 +2149,7 @@ impl Build {
         Ok((cmd, tool.to_string()))
     }
 
-    fn assemble(
-        &self,
-        lib_name: &str,
-        dst: &Path,
-        objs: &[Object],
-        print: Option<&PrintThread>,
-    ) -> Result<(), Error> {
+    fn assemble(&self, lib_name: &str, dst: &Path, objs: &[Object]) -> Result<(), Error> {
         // Delete the destination if it exists as we want to
         // create on the first iteration instead of appending.
         let _ = fs::remove_file(dst);
@@ -2165,7 +2163,7 @@ impl Build {
             .chain(self.objects.iter().map(std::ops::Deref::deref))
             .collect();
         for chunk in objs.chunks(100) {
-            self.assemble_progressive(dst, chunk, print)?;
+            self.assemble_progressive(dst, chunk)?;
         }
 
         if self.cuda && self.cuda_file_count() > 0 {
@@ -2176,8 +2174,8 @@ impl Build {
             let dlink = out_dir.join(lib_name.to_owned() + "_dlink.o");
             let mut nvcc = self.get_compiler().to_command();
             nvcc.arg("--device-link").arg("-o").arg(&dlink).arg(dst);
-            run(&mut nvcc, "nvcc", print)?;
-            self.assemble_progressive(dst, &[dlink.as_path()], print)?;
+            run(&mut nvcc, "nvcc", &self.cargo_output)?;
+            self.assemble_progressive(dst, &[dlink.as_path()])?;
         }
 
         let target = self.get_target()?;
@@ -2209,18 +2207,13 @@ impl Build {
             // NOTE: We add `s` even if flags were passed using $ARFLAGS/ar_flag, because `s`
             // here represents a _mode_, not an arbitrary flag. Further discussion of this choice
             // can be seen in https://github.com/rust-lang/cc-rs/pull/763.
-            run(ar.arg("s").arg(dst), &cmd, print)?;
+            run(ar.arg("s").arg(dst), &cmd, &self.cargo_output)?;
         }
 
         Ok(())
     }
 
-    fn assemble_progressive(
-        &self,
-        dst: &Path,
-        objs: &[&Path],
-        print: Option<&PrintThread>,
-    ) -> Result<(), Error> {
+    fn assemble_progressive(&self, dst: &Path, objs: &[&Path]) -> Result<(), Error> {
         let target = self.get_target()?;
 
         if target.contains("msvc") {
@@ -2241,7 +2234,7 @@ impl Build {
                 cmd.arg(dst);
             }
             cmd.args(objs);
-            run(&mut cmd, &program, print)?;
+            run(&mut cmd, &program, &self.cargo_output)?;
         } else {
             let (mut ar, cmd, _any_flags) = self.get_ar()?;
 
@@ -2272,7 +2265,7 @@ impl Build {
             // NOTE: We add cq here regardless of whether $ARFLAGS/ar_flag have been used because
             // it dictates the _mode_ ar runs in, which the setter of $ARFLAGS/ar_flag can't
             // dictate. See https://github.com/rust-lang/cc-rs/pull/763 for further discussion.
-            run(ar.arg("cq").arg(dst).args(objs), &cmd, print)?;
+            run(ar.arg("cq").arg(dst).args(objs), &cmd, &self.cargo_output)?;
         }
 
         Ok(())
diff --git a/src/os_pipe/mod.rs b/src/os_pipe/mod.rs
deleted file mode 100644
index 0a6ad791f..000000000
--- a/src/os_pipe/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-//! Adapted from:
-//!  - <https://doc.rust-lang.org/src/std/sys/unix/pipe.rs.html>
-//!  - <https://doc.rust-lang.org/src/std/sys/unix/fd.rs.html#385>
-//!  - <https://github.com/rust-lang/rust/blob/master/library/std/src/sys/mod.rs#L57>
-//!  - <https://github.com/oconnor663/os_pipe.rs>
-use std::fs::File;
-
-/// Open a new pipe and return a pair of [`File`] objects for the reader and writer.
-///
-/// This corresponds to the `pipe2` library call on Posix and the
-/// `CreatePipe` library call on Windows (though these implementation
-/// details might change). These pipes are non-inheritable, so new child
-/// processes won't receive a copy of them unless they're explicitly
-/// passed as stdin/stdout/stderr.
-pub fn pipe() -> std::io::Result<(File, File)> {
-    sys::pipe()
-}
-
-#[cfg(unix)]
-#[path = "unix.rs"]
-mod sys;
-
-#[cfg(windows)]
-#[path = "windows.rs"]
-mod sys;
-
-#[cfg(all(not(unix), not(windows)))]
-compile_error!("Only unix and windows support os_pipe!");
diff --git a/src/os_pipe/unix.rs b/src/os_pipe/unix.rs
deleted file mode 100644
index ec4e547f0..000000000
--- a/src/os_pipe/unix.rs
+++ /dev/null
@@ -1,121 +0,0 @@
-use std::{
-    fs::File,
-    io,
-    os::{raw::c_int, unix::io::FromRawFd},
-};
-
-pub(super) fn pipe() -> io::Result<(File, File)> {
-    let mut fds = [0; 2];
-
-    // The only known way right now to create atomically set the CLOEXEC flag is
-    // to use the `pipe2` syscall. This was added to Linux in 2.6.27, glibc 2.9
-    // and musl 0.9.3, and some other targets also have it.
-    #[cfg(any(
-        target_os = "dragonfly",
-        target_os = "freebsd",
-        target_os = "linux",
-        target_os = "netbsd",
-        target_os = "openbsd",
-        target_os = "redox"
-    ))]
-    {
-        unsafe {
-            cvt(libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC))?;
-        }
-    }
-
-    #[cfg(not(any(
-        target_os = "dragonfly",
-        target_os = "freebsd",
-        target_os = "linux",
-        target_os = "netbsd",
-        target_os = "openbsd",
-        target_os = "redox"
-    )))]
-    {
-        unsafe {
-            cvt(libc::pipe(fds.as_mut_ptr()))?;
-        }
-
-        cloexec::set_cloexec(fds[0])?;
-        cloexec::set_cloexec(fds[1])?;
-    }
-
-    unsafe { Ok((File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1]))) }
-}
-
-fn cvt(t: c_int) -> io::Result<c_int> {
-    if t == -1 {
-        Err(io::Error::last_os_error())
-    } else {
-        Ok(t)
-    }
-}
-
-#[cfg(not(any(
-    target_os = "dragonfly",
-    target_os = "freebsd",
-    target_os = "linux",
-    target_os = "netbsd",
-    target_os = "openbsd",
-    target_os = "redox"
-)))]
-mod cloexec {
-    use super::{c_int, cvt, io};
-
-    #[cfg(not(any(
-        target_env = "newlib",
-        target_os = "solaris",
-        target_os = "illumos",
-        target_os = "emscripten",
-        target_os = "fuchsia",
-        target_os = "l4re",
-        target_os = "linux",
-        target_os = "haiku",
-        target_os = "redox",
-        target_os = "vxworks",
-        target_os = "nto",
-    )))]
-    pub(super) fn set_cloexec(fd: c_int) -> io::Result<()> {
-        unsafe {
-            cvt(libc::ioctl(fd, libc::FIOCLEX))?;
-        }
-
-        Ok(())
-    }
-
-    #[cfg(any(
-        all(
-            target_env = "newlib",
-            not(any(target_os = "espidf", target_os = "horizon"))
-        ),
-        target_os = "solaris",
-        target_os = "illumos",
-        target_os = "emscripten",
-        target_os = "fuchsia",
-        target_os = "l4re",
-        target_os = "linux",
-        target_os = "haiku",
-        target_os = "redox",
-        target_os = "vxworks",
-        target_os = "nto",
-    ))]
-    pub(super) fn set_cloexec(fd: c_int) -> io::Result<()> {
-        unsafe {
-            let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
-            let new = previous | libc::FD_CLOEXEC;
-            if new != previous {
-                cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
-            }
-        }
-
-        Ok(())
-    }
-
-    // FD_CLOEXEC is not supported in ESP-IDF and Horizon OS but there's no need to,
-    // because neither supports spawning processes.
-    #[cfg(any(target_os = "espidf", target_os = "horizon"))]
-    pub(super) fn set_cloexec(_fd: c_int) -> io::Result<()> {
-        Ok(())
-    }
-}
diff --git a/src/os_pipe/windows.rs b/src/os_pipe/windows.rs
deleted file mode 100644
index de0106a2a..000000000
--- a/src/os_pipe/windows.rs
+++ /dev/null
@@ -1,24 +0,0 @@
-use crate::windows::windows_sys::{CreatePipe, INVALID_HANDLE_VALUE};
-use std::{fs::File, io, os::windows::prelude::*, ptr};
-
-/// NOTE: These pipes do not support IOCP.
-///
-/// If IOCP is needed, then you might want to emulate
-/// anonymous pipes with `CreateNamedPipe`, as Rust's stdlib does.
-pub(super) fn pipe() -> io::Result<(File, File)> {
-    let mut read_pipe = INVALID_HANDLE_VALUE;
-    let mut write_pipe = INVALID_HANDLE_VALUE;
-
-    let ret = unsafe { CreatePipe(&mut read_pipe, &mut write_pipe, ptr::null_mut(), 0) };
-
-    if ret == 0 {
-        Err(io::Error::last_os_error())
-    } else {
-        unsafe {
-            Ok((
-                File::from_raw_handle(read_pipe as RawHandle),
-                File::from_raw_handle(write_pipe as RawHandle),
-            ))
-        }
-    }
-}
diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs
index 1c6d597ea..d69146dc5 100644
--- a/src/parallel/mod.rs
+++ b/src/parallel/mod.rs
@@ -1,5 +1,6 @@
 pub(crate) mod async_executor;
 pub(crate) mod job_token;
+pub(crate) mod stderr;
 
 /// Remove all element in `vec` which `f(element)` returns `false`.
 ///
diff --git a/src/parallel/stderr.rs b/src/parallel/stderr.rs
new file mode 100644
index 000000000..2c05f03f3
--- /dev/null
+++ b/src/parallel/stderr.rs
@@ -0,0 +1,75 @@
+/// Helpers functions for [ChildStderr].
+use std::process::ChildStderr;
+
+use crate::{Error, ErrorKind};
+
+#[cfg(all(not(unix), not(windows)))]
+compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature.");
+
+#[allow(unused_variables)]
+pub fn set_non_blocking(stderr: &mut ChildStderr) -> Result<(), Error> {
+    // On Unix, switch the pipe to non-blocking mode.
+    // On Windows, we have a different way to be non-blocking.
+    #[cfg(unix)]
+    {
+        use std::os::unix::io::AsRawFd;
+        let fd = stderr.as_raw_fd();
+        debug_assert_eq!(
+            unsafe { libc::fcntl(fd, libc::F_GETFL, 0) },
+            0,
+            "stderr should have no flags set"
+        );
+
+        if unsafe { libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) } != 0 {
+            return Err(Error::new(
+                ErrorKind::IOError,
+                format!(
+                    "Failed to set flags for child stderr: {}",
+                    std::io::Error::last_os_error()
+                ),
+            ));
+        }
+    }
+
+    Ok(())
+}
+
+pub fn bytes_available(stderr: &mut ChildStderr) -> Result<usize, Error> {
+    let mut bytes_available = 0;
+    #[cfg(windows)]
+    {
+        use crate::windows::windows_sys::PeekNamedPipe;
+        use std::os::windows::io::AsRawHandle;
+        use std::ptr::null_mut;
+        if unsafe {
+            PeekNamedPipe(
+                stderr.as_raw_handle(),
+                null_mut(),
+                0,
+                null_mut(),
+                &mut bytes_available,
+                null_mut(),
+            )
+        } == 0
+        {
+            return Err(Error::new(
+                ErrorKind::IOError,
+                format!(
+                    "PeekNamedPipe failed with {}",
+                    std::io::Error::last_os_error()
+                ),
+            ));
+        }
+    }
+    #[cfg(unix)]
+    {
+        use std::os::unix::io::AsRawFd;
+        if unsafe { libc::ioctl(stderr.as_raw_fd(), libc::FIONREAD, &mut bytes_available) } != 0 {
+            return Err(Error::new(
+                ErrorKind::IOError,
+                format!("ioctl failed with {}", std::io::Error::last_os_error()),
+            ));
+        }
+    }
+    Ok(bytes_available as usize)
+}
diff --git a/src/windows/windows_sys.rs b/src/windows/windows_sys.rs
index 20a256076..88ca76730 100644
--- a/src/windows/windows_sys.rs
+++ b/src/windows/windows_sys.rs
@@ -55,16 +55,18 @@ extern "system" {
 }
 #[link(name = "kernel32")]
 extern "system" {
-    pub fn CreatePipe(
-        hreadpipe: *mut HANDLE,
-        hwritepipe: *mut HANDLE,
-        lppipeattributes: *const SECURITY_ATTRIBUTES,
-        nsize: u32,
-    ) -> BOOL;
+    pub fn OpenSemaphoreA(dwdesiredaccess: u32, binherithandle: BOOL, lpname: PCSTR) -> HANDLE;
 }
 #[link(name = "kernel32")]
 extern "system" {
-    pub fn OpenSemaphoreA(dwdesiredaccess: u32, binherithandle: BOOL, lpname: PCSTR) -> HANDLE;
+    pub fn PeekNamedPipe(
+        hnamedpipe: HANDLE,
+        lpbuffer: *mut ::core::ffi::c_void,
+        nbuffersize: u32,
+        lpbytesread: *mut u32,
+        lptotalbytesavail: *mut u32,
+        lpbytesleftthismessage: *mut u32,
+    ) -> BOOL;
 }
 #[link(name = "kernel32")]
 extern "system" {
@@ -148,7 +150,6 @@ pub type HANDLE = *mut ::core::ffi::c_void;
 pub type HKEY = *mut ::core::ffi::c_void;
 pub const HKEY_LOCAL_MACHINE: HKEY = invalid_mut(-2147483646i32 as _);
 pub type HRESULT = i32;
-pub const INVALID_HANDLE_VALUE: HANDLE = invalid_mut(-1i32 as _);
 pub type IUnknown = *mut ::core::ffi::c_void;
 pub const KEY_READ: REG_SAM_FLAGS = 131097u32;
 pub const KEY_WOW64_32KEY: REG_SAM_FLAGS = 512u32;
@@ -184,18 +185,6 @@ impl ::core::clone::Clone for SAFEARRAYBOUND {
         *self
     }
 }
-#[repr(C)]
-pub struct SECURITY_ATTRIBUTES {
-    pub nLength: u32,
-    pub lpSecurityDescriptor: *mut ::core::ffi::c_void,
-    pub bInheritHandle: BOOL,
-}
-impl ::core::marker::Copy for SECURITY_ATTRIBUTES {}
-impl ::core::clone::Clone for SECURITY_ATTRIBUTES {
-    fn clone(&self) -> Self {
-        *self
-    }
-}
 pub const SEMAPHORE_MODIFY_STATE: SYNCHRONIZATION_ACCESS_RIGHTS = 2u32;
 pub type SYNCHRONIZATION_ACCESS_RIGHTS = u32;
 pub const S_FALSE: HRESULT = 1i32;