diff options
author | Jesse Luehrs <doy@tozt.net> | 2022-02-22 17:02:12 -0500 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2022-02-23 02:44:18 -0500 |
commit | 39287b07f87aba15c4cb0f64d7008ba67289151d (patch) | |
tree | a0ce5fee6e98a7c429f668000d7cdc71cf8d4797 | |
parent | ebcf5f15081f6a84c861eb2aecbf962396a88695 (diff) | |
download | pty-process-39287b07f87aba15c4cb0f64d7008ba67289151d.tar.gz pty-process-39287b07f87aba15c4cb0f64d7008ba67289151d.zip |
another rewrite
-rw-r--r-- | CHANGELOG.md | 7 | ||||
-rw-r--r-- | Cargo.toml | 18 | ||||
-rw-r--r-- | README.md | 49 | ||||
-rw-r--r-- | deny.toml | 2 | ||||
-rw-r--r-- | examples/async-std.rs | 85 | ||||
-rw-r--r-- | examples/basic.rs | 9 | ||||
-rw-r--r-- | examples/interhack.rs | 87 | ||||
-rw-r--r-- | examples/smol.rs | 84 | ||||
-rw-r--r-- | examples/tokio.rs | 17 | ||||
-rw-r--r-- | src/blocking/command.rs | 23 | ||||
-rw-r--r-- | src/blocking/mod.rs | 2 | ||||
-rw-r--r-- | src/blocking/pty.rs | 65 | ||||
-rw-r--r-- | src/command.rs | 64 | ||||
-rw-r--r-- | src/error.rs | 9 | ||||
-rw-r--r-- | src/lib.rs | 26 | ||||
-rw-r--r-- | src/pty.rs | 328 | ||||
-rw-r--r-- | src/sys.rs | 172 | ||||
-rw-r--r-- | tests/basic.rs | 50 | ||||
-rw-r--r-- | tests/behavior.rs | 315 | ||||
-rw-r--r-- | tests/fds.rs | 8 | ||||
-rw-r--r-- | tests/fds_async.rs | 61 | ||||
-rw-r--r-- | tests/helpers/mod.rs | 23 | ||||
-rw-r--r-- | tests/pipe.rs | 131 | ||||
-rw-r--r-- | tests/split.rs | 155 | ||||
-rw-r--r-- | tests/winch.rs | 41 |
25 files changed, 1108 insertions, 723 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 62c4472..467fc45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [Unreleased] + +### Changed + +* Complete rewrite of the API +* Tokio is now the only supported backend, enabled via the `async` feature + ## [0.2.0] - 2021-12-15 ### Changed @@ -13,27 +13,21 @@ license = "MIT" include = ["src/**/*", "LICENSE", "README.md", "CHANGELOG.md"] [dependencies] -libc = "0.2.112" +libc = "0.2.119" nix = "0.23.1" -async-io = { version = "1.6.0", optional = true } -async-process = { version = "1.3.0", optional = true } -futures-io = { version = "0.3.19", optional = true } +tokio = { version = "1.17.0", features = ["fs", "process", "net"], optional = true } [dev-dependencies] -async-executor = "1.4.1" -async-std = { version = "1.10.0", features = ["unstable"] } -futures = "0.3.19" +futures = "0.3.21" regex = "1.5.4" -smol = "1.2.5" term_size = "0.3.2" -tokio = { version = "1.15.0", features = ["full"] } -tokio-util = { version = "0.6.9", features = ["compat"] } +tokio = { version = "1.17.0", features = ["full"] } [features] default = [] -async = ["async-io", "async-process", "futures-io"] +async = ["tokio"] [patch.crates-io] -async-process = { git = "https://github.com/doy/async-process" } +nix = { path = "../src/nix" } @@ -1,36 +1,31 @@ # pty-process -This crate adds a helper method to `std::process::Command` (and optionally its -equivalent in various async frameworks) to allocate a pty to spawn the process -into. This allows for manipulation of interactive programs. +This crate is a wrapper around [`tokio::process::Command`] or +[`std::process::Command`] which provides the ability to allocate a pty +and spawn new processes attached to that pty, with the pty as their +controlling terminal. This allows for manipulation of interactive +programs. -The basic functionality is provided by the `Command` trait in this crate: +The basic functionality looks like this: ```rust -use pty_process::Command as _; - -let mut cmd = std::process::Command::new("nethack"); -let child = cmd.spawn_pty(Some(&pty_process::Size::new(24, 80))).unwrap(); +let mut pty = pty_process::Pty::new().unwrap(); +pty.resize(pty_process::Size::new(24, 80)).unwrap(); +let mut cmd = pty_process::Command::new("nethack"); +let child = cmd.spawn(&pty.pts().unwrap()).unwrap(); ``` -The `child` instance returned by the call to `spawn_pty` is a thin wrapper -around the `Child` struct associated with the `Command` module used. You can -use it identically to how you would use the normal `std::process::Child` -instance, but it also provides additional methods for interacting with the pty: - -```rust -use std::io::Write as _; - -child.pty().write_all(b"foo\n").unwrap(); -child.resize_pty(&pty_process::Size::new(30, 100)).unwrap(); -let status = child.wait().unwrap(); -``` +The returned `child` is a normal instance of [`tokio::process::Child`] (or +[`std::process::Child`] for the [`blocking`](crate::blocking) variant), +with its `stdin`/`stdout`/`stderr` file descriptors pointing at the given +pty. The `pty` instance implements [`tokio::io::AsyncRead`] and +[`tokio::io::AsyncWrite`] (or [`std::io::Read`] and [`std::io::Write`] for +the [`blocking`] variant), and can be used to communicate with the child +process. The child process will also be made a session leader of a new +session, and the controlling terminal of that session will be set to the +given pty. -The available implementations are gated by features: -* `backend-std`: Add an implementation for `std::process::Command`. Enabled by - default. -* `backend-smol`: Add an implementation for `smol::process::Command`. -* `backend-async-std`: Add an implementation for `async_std::process::Command`. -* `backend-tokio`: Add an implementation for `tokio::process::Command`. +## Features -Any number of backends may be enabled, depending on your needs. +By default, only the [`blocking`](crate::blocking) APIs are available. To +include the asynchronous APIs, you must enable the `async` feature. @@ -10,5 +10,5 @@ unsound = "deny" [bans] [licenses] -allow = ["MIT", "Apache-2.0", "BSD-3-Clause"] +allow = ["MIT", "Apache-2.0"] copyleft = "deny" diff --git a/examples/async-std.rs b/examples/async-std.rs deleted file mode 100644 index 970841c..0000000 --- a/examples/async-std.rs +++ /dev/null @@ -1,85 +0,0 @@ -mod raw_guard; - -#[cfg(feature = "async")] -mod main { - use async_std::io::prelude::WriteExt as _; - use async_std::io::ReadExt as _; - use async_std::prelude::FutureExt as _; - - pub async fn run( - child: &async_process::Child, - pty: &pty_process::Pty, - ) -> std::result::Result<(), Box<dyn std::error::Error + 'static>> { - let _raw = super::raw_guard::RawGuard::new(); - - let mut input_pty = pty; - let mut output_pty = pty; - let ex = async_executor::Executor::new(); - - let input = ex.spawn(async { - let mut buf = [0_u8; 4096]; - let mut stdin = async_std::io::stdin(); - loop { - match stdin.read(&mut buf).await { - Ok(bytes) => { - input_pty.write_all(&buf[..bytes]).await.unwrap(); - } - Err(e) => { - eprintln!("stdin read failed: {:?}", e); - break; - } - } - } - }); - let output = ex.spawn(async { - let mut buf = [0_u8; 4096]; - let mut stdout = async_std::io::stdout(); - loop { - match output_pty.read(&mut buf).await { - Ok(bytes) => { - stdout.write_all(&buf[..bytes]).await.unwrap(); - stdout.flush().await.unwrap(); - } - Err(e) => { - eprintln!("pty read failed: {:?}", e); - break; - } - } - } - }); - - let wait = async { - child.status_no_drop().await.unwrap(); - }; - - ex.run(input.race(output).race(wait)).await; - - Ok(()) - } -} - -#[cfg(feature = "async")] -fn main() { - use std::os::unix::process::ExitStatusExt as _; - - let status = async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("tac") - // .args(&["500"]) - .spawn(&pty) - .unwrap(); - main::run(&child, &pty).await.unwrap(); - child.status().await.unwrap() - }); - std::process::exit( - status - .code() - .unwrap_or_else(|| status.signal().unwrap_or(0) + 128), - ); -} - -#[cfg(not(feature = "async"))] -fn main() { - unimplemented!() -} diff --git a/examples/basic.rs b/examples/basic.rs index 4f996b9..8a56e57 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -6,7 +6,7 @@ mod main { pub fn run( child: &mut std::process::Child, - mut pty: &pty_process::blocking::Pty, + pty: &mut pty_process::blocking::Pty, ) { let _raw = super::raw_guard::RawGuard::new(); let mut buf = [0_u8; 4096]; @@ -75,14 +75,15 @@ mod main { fn main() { use std::os::unix::process::ExitStatusExt as _; - let pty = pty_process::blocking::Pty::new().unwrap(); + let mut pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("tac") // .args(&["500"]) - .spawn(&pty) + .spawn(&pts) .unwrap(); - main::run(&mut child, &pty); + main::run(&mut child, &mut pty); let status = child.wait().unwrap(); std::process::exit( diff --git a/examples/interhack.rs b/examples/interhack.rs index 212c12e..5fe1474 100644 --- a/examples/interhack.rs +++ b/examples/interhack.rs @@ -2,68 +2,63 @@ mod raw_guard; #[cfg(feature = "async")] mod main { - use smol::io::{AsyncReadExt as _, AsyncWriteExt as _}; + use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; pub async fn run( - child: &async_process::Child, - pty: &pty_process::Pty, + child: &mut tokio::process::Child, + pty: &mut pty_process::Pty, ) -> std::result::Result<(), Box<dyn std::error::Error>> { let _raw = super::raw_guard::RawGuard::new(); - let mut input_pty = pty; - let mut output_pty = pty; - let ex = smol::Executor::new(); + let mut in_buf = [0_u8; 4096]; + let mut out_buf = [0_u8; 4096]; - let input = ex.spawn(async { - let mut buf = [0_u8; 4096]; - let mut stdin = smol::Unblock::new(std::io::stdin()); - loop { - match stdin.read(&mut buf).await { + let mut stdin = tokio::io::stdin(); + let mut stdout = tokio::io::stdout(); + + #[allow(clippy::trivial_regex)] + let re = regex::bytes::Regex::new("Elbereth").unwrap(); + + loop { + tokio::select! { + bytes = stdin.read(&mut in_buf) => match bytes { Ok(bytes) => { // engrave Elbereth with ^E - if buf[..bytes].contains(&5u8) { - for byte in buf[..bytes].iter() { + if in_buf[..bytes].contains(&5u8) { + for byte in in_buf[..bytes].iter() { match byte { - 5u8 => input_pty + 5u8 => pty .write_all(b"E- Elbereth\n") .await .unwrap(), - _ => input_pty + _ => pty .write_all(&[*byte]) .await .unwrap(), } } } else { - input_pty.write_all(&buf[..bytes]).await.unwrap(); + pty.write_all(&in_buf[..bytes]).await.unwrap(); } } Err(e) => { eprintln!("stdin read failed: {:?}", e); break; } - } - } - }); - let output = ex.spawn(async { - let mut buf = [0_u8; 4096]; - let mut stdout = smol::Unblock::new(std::io::stdout()); - #[allow(clippy::trivial_regex)] - let re = regex::bytes::Regex::new("Elbereth").unwrap(); - loop { - match output_pty.read(&mut buf).await { + }, + bytes = pty.read(&mut out_buf) => match bytes { Ok(bytes) => { // highlight successful Elbereths - if re.is_match(&buf[..bytes]) { + if re.is_match(&out_buf[..bytes]) { stdout .write_all(&re.replace_all( - &buf[..bytes], + &out_buf[..bytes], &b"\x1b[35m$0\x1b[m"[..], )) .await .unwrap(); } else { - stdout.write_all(&buf[..bytes]).await.unwrap(); + stdout.write_all(&out_buf[..bytes]).await.unwrap(); } stdout.flush().await.unwrap(); } @@ -71,38 +66,26 @@ mod main { eprintln!("pty read failed: {:?}", e); break; } - } + }, + _ = child.wait() => break, } - }); - - let wait = async { - child.status_no_drop().await.unwrap(); - }; - - ex.run(smol::future::or(smol::future::or(input, output), wait)) - .await; + } Ok(()) } } #[cfg(feature = "async")] -fn main() { +#[tokio::main] +async fn main() { use std::os::unix::process::ExitStatusExt as _; - let (w, h) = if let Some((w, h)) = term_size::dimensions() { - (w as u16, h as u16) - } else { - (80, 24) - }; - let status = smol::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(h, w)).unwrap(); - let mut child = - pty_process::Command::new("nethack").spawn(&pty).unwrap(); - main::run(&child, &pty).await.unwrap(); - child.status().await.unwrap() - }); + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut child = pty_process::Command::new("nethack").spawn(&pts).unwrap(); + main::run(&mut child, &mut pty).await.unwrap(); + let status = child.wait().await.unwrap(); std::process::exit( status .code() diff --git a/examples/smol.rs b/examples/smol.rs deleted file mode 100644 index 651ff10..0000000 --- a/examples/smol.rs +++ /dev/null @@ -1,84 +0,0 @@ -mod raw_guard; - -#[cfg(feature = "async")] -mod main { - use smol::io::{AsyncReadExt as _, AsyncWriteExt as _}; - - pub async fn run( - child: &async_process::Child, - pty: &pty_process::Pty, - ) -> std::result::Result<(), Box<dyn std::error::Error>> { - let _raw = super::raw_guard::RawGuard::new(); - - let mut input_pty = pty; - let mut output_pty = pty; - let ex = smol::Executor::new(); - - let input = ex.spawn(async { - let mut buf = [0_u8; 4096]; - let mut stdin = smol::Unblock::new(std::io::stdin()); - loop { - match stdin.read(&mut buf).await { - Ok(bytes) => { - input_pty.write_all(&buf[..bytes]).await.unwrap(); - } - Err(e) => { - eprintln!("stdin read failed: {:?}", e); - break; - } - } - } - }); - let output = ex.spawn(async { - let mut buf = [0_u8; 4096]; - let mut stdout = smol::Unblock::new(std::io::stdout()); - loop { - match output_pty.read(&mut buf).await { - Ok(bytes) => { - stdout.write_all(&buf[..bytes]).await.unwrap(); - stdout.flush().await.unwrap(); - } - Err(e) => { - eprintln!("pty read failed: {:?}", e); - break; - } - } - } - }); - - let wait = async { - child.status_no_drop().await.unwrap(); - }; - - ex.run(smol::future::or(smol::future::or(input, output), wait)) - .await; - - Ok(()) - } -} - -#[cfg(feature = "async")] -fn main() { - use std::os::unix::process::ExitStatusExt as _; - - let status = smol::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("tac") - // .args(&["500"]) - .spawn(&pty) - .unwrap(); - main::run(&child, &pty).await.unwrap(); - child.status().await.unwrap() - }); - std::process::exit( - status - .code() - .unwrap_or_else(|| status.signal().unwrap_or(0) + 128), - ); -} - -#[cfg(not(feature = "async"))] -fn main() { - unimplemented!() -} diff --git a/examples/tokio.rs b/examples/tokio.rs index 55436c2..dc11855 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -3,11 +3,10 @@ mod raw_guard; #[cfg(feature = "async")] mod main { use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; - use tokio_util::compat::FuturesAsyncReadCompatExt as _; pub async fn run( - child: &async_process::Child, - pty: &pty_process::Pty, + child: &mut tokio::process::Child, + pty: &mut pty_process::Pty, ) -> std::result::Result<(), Box<dyn std::error::Error>> { let _raw = super::raw_guard::RawGuard::new(); @@ -16,7 +15,6 @@ mod main { let mut stdin = tokio::io::stdin(); let mut stdout = tokio::io::stdout(); - let mut pty = pty.compat(); loop { tokio::select! { @@ -39,7 +37,7 @@ mod main { break; } }, - _ = child.status_no_drop() => break, + _ = child.wait() => break, } } @@ -52,14 +50,15 @@ mod main { async fn main() { use std::os::unix::process::ExitStatusExt as _; - let pty = pty_process::Pty::new().unwrap(); + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::Command::new("tac") // .args(&["500"]) - .spawn(&pty) + .spawn(&pts) .unwrap(); - main::run(&child, &pty).await.unwrap(); - let status = child.status().await.unwrap(); + main::run(&mut child, &mut pty).await.unwrap(); + let status = child.wait().await.unwrap(); std::process::exit( status .code() diff --git a/src/blocking/command.rs b/src/blocking/command.rs index 6c4a825..9cacebe 100644 --- a/src/blocking/command.rs +++ b/src/blocking/command.rs @@ -117,14 +117,13 @@ impl Command { } /// Executes the command as a child process via - /// [`std::process::Command::spawn`], and attaches the given `pty` to - /// that child. The pty will be attached to all of `stdin`, `stdout`, and - /// `stderr` of the child, unless those file descriptors were previously - /// overridden through calls to [`stdin`](Self::stdin), - /// [`stdout`](Self::stdout), or [`stderr`](Self::stderr). The newly - /// created child process will also be made the session leader of a new - /// session, and will have the given `pty` instance set as its controlling - /// terminal. + /// [`std::process::Command::spawn`] on the given pty. The pty will be + /// attached to all of `stdin`, `stdout`, and `stderr` of the child, + /// unless those file descriptors were previously overridden through calls + /// to [`stdin`](Self::stdin), [`stdout`](Self::stdout), or + /// [`stderr`](Self::stderr). The newly created child process will also be + /// made the session leader of a new session, and will have the given + /// pty set as its controlling terminal. /// /// # Errors /// Returns an error if we fail to allocate new file descriptors for @@ -134,10 +133,9 @@ impl Command { /// session leader or set its controlling terminal. pub fn spawn( &mut self, - pty: &crate::blocking::Pty, + pts: &crate::blocking::Pts, ) -> crate::Result<std::process::Child> { - let pts = pty.pts(); - let (stdin, stdout, stderr) = crate::sys::setup_subprocess(pts)?; + let (stdin, stdout, stderr) = pts.0.setup_subprocess()?; if !self.stdin { self.inner.stdin(stdin); @@ -149,7 +147,7 @@ impl Command { self.inner.stderr(stderr); } - let mut session_leader = crate::sys::session_leader(pts); + let mut session_leader = pts.0.session_leader(); // Safety: setsid() is an async-signal-safe function and ioctl() is a // raw syscall (which is inherently async-signal-safe). if let Some(mut custom) = self.pre_exec.take() { @@ -190,6 +188,7 @@ impl Command { self } + /// See [`std::os::unix::process::CommandExt::arg0`] pub fn arg0<S>(&mut self, arg: S) -> &mut Self where S: AsRef<std::ffi::OsStr>, diff --git a/src/blocking/mod.rs b/src/blocking/mod.rs index 17a0733..9f4e284 100644 --- a/src/blocking/mod.rs +++ b/src/blocking/mod.rs @@ -4,4 +4,4 @@ mod command; pub use command::Command; mod pty; -pub use pty::Pty; +pub use pty::{Pts, Pty}; diff --git a/src/blocking/pty.rs b/src/blocking/pty.rs index 6fa3382..e2c5bde 100644 --- a/src/blocking/pty.rs +++ b/src/blocking/pty.rs @@ -1,8 +1,5 @@ /// An allocated pty -pub struct Pty { - pt: std::fs::File, - pts: std::fs::File, -} +pub struct Pty(crate::sys::Pty); impl Pty { /// Allocate and return a new pty. @@ -10,12 +7,7 @@ impl Pty { /// # Errors /// Returns an error if the pty failed to be allocated. pub fn new() -> crate::Result<Self> { - let (pt, ptsname) = crate::sys::create_pt()?; - let pts = std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(&ptsname)?; - Ok(Self { pt, pts }) + Ok(Self(crate::sys::Pty::open()?)) } /// Change the terminal size associated with the pty. @@ -23,51 +15,60 @@ impl Pty { /// # Errors /// Returns an error if we were unable to set the terminal size. pub fn resize(&self, size: crate::Size) -> crate::Result<()> { - Ok(crate::sys::set_term_size(self, size)?) + self.0.set_term_size(size) } - pub(crate) fn pts(&self) -> &std::fs::File { - &self.pts + /// Opens a file descriptor for the other end of the pty, which should be + /// attached to the child process running in it. See + /// [`Command::spawn`](crate::blocking::Command::spawn). + /// + /// # Errors + /// Returns an error if the device node to open could not be determined, + /// or if the device node could not be opened. + pub fn pts(&self) -> crate::Result<Pts> { + Ok(Pts(self.0.pts()?)) } } -impl std::ops::Deref for Pty { - type Target = std::fs::File; - - fn deref(&self) -> &Self::Target { - &self.pt +impl std::os::unix::io::AsRawFd for Pty { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.0 .0.as_raw_fd() } } -impl std::ops::DerefMut for Pty { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.pt +impl std::io::Read for Pty { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + self.0 .0.read(buf) } } -impl std::os::unix::io::AsRawFd for Pty { - fn as_raw_fd(&self) -> std::os::unix::io::RawFd { - self.pt.as_raw_fd() +impl std::io::Write for Pty { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.0 .0.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0 .0.flush() } } -// there is a Read impl for &std::fs::File, but without this explicit impl, -// rust finds the Read impl for std::fs::File first, and then complains that -// it requires &mut self, because method resolution/autoderef doesn't take -// mutability into account impl std::io::Read for &Pty { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { - (&self.pt).read(buf) + (&self.0 .0).read(buf) } } -// same as above impl std::io::Write for &Pty { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { - (&self.pt).write(buf) + (&self.0 .0).write(buf) } fn flush(&mut self) -> std::io::Result<()> { - (&self.pt).flush() + (&self.0 .0).flush() } } + +/// The child end of the pty +/// +/// See [`Pty::pts`] and [`Command::spawn`](crate::blocking::Command::spawn) +pub struct Pts(pub(crate) crate::sys::Pts); diff --git a/src/command.rs b/src/command.rs index 8627a35..7ff9858 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,8 +1,6 @@ -use async_process::unix::CommandExt as _; - -/// Wrapper around [`async_process::Command`] +/// Wrapper around [`tokio::process::Command`] pub struct Command { - inner: async_process::Command, + inner: tokio::process::Command, stdin: bool, stdout: bool, stderr: bool, @@ -13,10 +11,10 @@ pub struct Command { } impl Command { - /// See [`async_process::Command::new`] + /// See [`tokio::process::Command::new`] pub fn new<S: AsRef<std::ffi::OsStr>>(program: S) -> Self { Self { - inner: async_process::Command::new(program), + inner: tokio::process::Command::new(program), stdin: false, stdout: false, stderr: false, @@ -25,13 +23,13 @@ impl Command { } } - /// See [`async_process::Command::arg`] + /// See [`tokio::process::Command::arg`] pub fn arg<S: AsRef<std::ffi::OsStr>>(&mut self, arg: S) -> &mut Self { self.inner.arg(arg); self } - /// See [`async_process::Command::args`] + /// See [`tokio::process::Command::args`] pub fn args<I, S>(&mut self, args: I) -> &mut Self where I: IntoIterator<Item = S>, @@ -41,7 +39,7 @@ impl Command { self } - /// See [`async_process::Command::env`] + /// See [`tokio::process::Command::env`] pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self where K: AsRef<std::ffi::OsStr>, @@ -51,7 +49,7 @@ impl Command { self } - /// See [`async_process::Command::envs`] + /// See [`tokio::process::Command::envs`] pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self where I: IntoIterator<Item = (K, V)>, @@ -62,7 +60,7 @@ impl Command { self } - /// See [`async_process::Command::env_remove`] + /// See [`tokio::process::Command::env_remove`] pub fn env_remove<K: AsRef<std::ffi::OsStr>>( &mut self, key: K, @@ -71,13 +69,13 @@ impl Command { self } - /// See [`async_process::Command::env_clear`] + /// See [`tokio::process::Command::env_clear`] pub fn env_clear(&mut self) -> &mut Self { self.inner.env_clear(); self } - /// See [`async_process::Command::current_dir`] + /// See [`tokio::process::Command::current_dir`] pub fn current_dir<P: AsRef<std::path::Path>>( &mut self, dir: P, @@ -86,7 +84,7 @@ impl Command { self } - /// See [`async_process::Command::stdin`] + /// See [`tokio::process::Command::stdin`] pub fn stdin<T: Into<std::process::Stdio>>( &mut self, cfg: T, @@ -96,7 +94,7 @@ impl Command { self } - /// See [`async_process::Command::stdout`] + /// See [`tokio::process::Command::stdout`] pub fn stdout<T: Into<std::process::Stdio>>( &mut self, cfg: T, @@ -106,7 +104,7 @@ impl Command { self } - /// See [`async_process::Command::stderr`] + /// See [`tokio::process::Command::stderr`] pub fn stderr<T: Into<std::process::Stdio>>( &mut self, cfg: T, @@ -117,27 +115,25 @@ impl Command { } /// Executes the command as a child process via - /// [`async_process::Command::spawn`], and attaches the given `pty` to - /// that child. The pty will be attached to all of `stdin`, `stdout`, and - /// `stderr` of the child, unless those file descriptors were previously - /// overridden through calls to [`stdin`](Self::stdin), - /// [`stdout`](Self::stdout), or [`stderr`](Self::stderr). The newly - /// created child process will also be made the session leader of a new - /// session, and will have the given `pty` instance set as its controlling - /// terminal. + /// [`tokio::process::Command::spawn`] on the given pty. The pty will be + /// attached to all of `stdin`, `stdout`, and `stderr` of the child, + /// unless those file descriptors were previously overridden through calls + /// to [`stdin`](Self::stdin), [`stdout`](Self::stdout), or + /// [`stderr`](Self::stderr). The newly created child process will also be + /// made the session leader of a new session, and will have the given + /// pty set as its controlling terminal. /// /// # Errors /// Returns an error if we fail to allocate new file descriptors for /// attaching the pty to the child process, or if we fail to spawn the /// child process (see the documentation for - /// [`async_process::Command::spawn`]), or if we fail to make the child a + /// [`tokio::process::Command::spawn`]), or if we fail to make the child a /// session leader or set its controlling terminal. pub fn spawn( &mut self, - pty: &crate::Pty, - ) -> crate::Result<async_process::Child> { - let pts = pty.pts(); - let (stdin, stdout, stderr) = crate::sys::setup_subprocess(pts)?; + pts: &crate::Pts, + ) -> crate::Result<tokio::process::Child> { + let (stdin, stdout, stderr) = pts.0.setup_subprocess()?; if !self.stdin { self.inner.stdin(stdin); @@ -149,7 +145,7 @@ impl Command { self.inner.stderr(stderr); } - let mut session_leader = crate::sys::session_leader(pts); + let mut session_leader = pts.0.session_leader(); // Safety: setsid() is an async-signal-safe function and ioctl() is a // raw syscall (which is inherently async-signal-safe). if let Some(mut custom) = self.pre_exec.take() { @@ -168,19 +164,19 @@ impl Command { Ok(self.inner.spawn()?) } - /// See [`async_process::unix::CommandExt::uid`] + /// See [`tokio::process::Command::uid`] pub fn uid(&mut self, id: u32) -> &mut Self { self.inner.uid(id); self } - /// See [`async_process::unix::CommandExt::gid`] + /// See [`tokio::process::Command::gid`] pub fn gid(&mut self, id: u32) -> &mut Self { self.inner.gid(id); self } - /// See [`async_process::unix::CommandExt::pre_exec`] + /// See [`tokio::process::Command::pre_exec`] #[allow(clippy::missing_safety_doc)] pub unsafe fn pre_exec<F>(&mut self, f: F) -> &mut Self where @@ -190,7 +186,7 @@ impl Command { self } - /// See [`async_process::unix::CommandExt::arg0`] + /// See [`tokio::process::Command::arg0`] pub fn arg0<S>(&mut self, arg: S) -> &mut Self where S: AsRef<std::ffi::OsStr>, diff --git a/src/error.rs b/src/error.rs index 9d4c31e..db9c647 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,6 +5,9 @@ pub enum Error { Io(std::io::Error), /// error came from nix::Error Nix(nix::Error), + /// unsplit was called on halves of two different ptys + #[cfg(feature = "async")] + Unsplit(crate::OwnedReadPty, crate::OwnedWritePty), } impl std::fmt::Display for Error { @@ -12,6 +15,10 @@ impl std::fmt::Display for Error { match self { Self::Io(e) => write!(f, "{}", e), Self::Nix(e) => write!(f, "{}", e), + #[cfg(feature = "async")] + Self::Unsplit(..) => { + write!(f, "unsplit called on halves of two different ptys") + } } } } @@ -33,6 +40,8 @@ impl std::error::Error for Error { match self { Self::Io(e) => Some(e), Self::Nix(e) => Some(e), + #[cfg(feature = "async")] + Self::Unsplit(..) => None, } } } @@ -1,5 +1,5 @@ -//! This crate is a wrapper around [`std::process::Command`] or -//! [`async_process::Command`] which provides the ability to allocate a pty +//! This crate is a wrapper around [`tokio::process::Command`] or +//! [`std::process::Command`] which provides the ability to allocate a pty //! and spawn new processes attached to that pty, with the pty as their //! controlling terminal. This allows for manipulation of interactive //! programs. @@ -10,25 +10,23 @@ //! let mut pty = pty_process::Pty::new().unwrap(); //! pty.resize(pty_process::Size::new(24, 80)).unwrap(); //! let mut cmd = pty_process::Command::new("nethack"); -//! let child = cmd.spawn(&pty).unwrap(); +//! let child = cmd.spawn(&pty.pts().unwrap()).unwrap(); //! ``` //! -//! The returned `child` is a normal instance of [`async_process::Child`] (or +//! The returned `child` is a normal instance of [`tokio::process::Child`] (or //! [`std::process::Child`] for the [`blocking`](crate::blocking) variant), //! with its `stdin`/`stdout`/`stderr` file descriptors pointing at the given -//! pty. The `pty` instance implements [`std::io::Read`] and -//! [`std::io::Write`] (or [`futures_io::AsyncRead`] and -//! [`futures_io::AsyncWrite`] for the [`blocking`] variant), and can be used -//! to communicate with the child process. The child process will also be made -//! a session leader of a new session, and the controlling terminal of that -//! session will be set to the given pty. +//! pty. The `pty` instance implements [`tokio::io::AsyncRead`] and +//! [`tokio::io::AsyncWrite`] (or [`std::io::Read`] and [`std::io::Write`] for +//! the [`blocking`] variant), and can be used to communicate with the child +//! process. The child process will also be made a session leader of a new +//! session, and the controlling terminal of that session will be set to the +//! given pty. //! //! # Features //! //! By default, only the [`blocking`](crate::blocking) APIs are available. To -//! include the asynchronous APIs, you must enable the `async` feature. See -//! the `examples` directory in the repository for examples of how to use this -//! crate with the various different asynchronous frameworks. +//! include the asynchronous APIs, you must enable the `async` feature. #![warn(clippy::cargo)] #![warn(clippy::pedantic)] @@ -59,4 +57,4 @@ pub use command::Command; #[cfg(feature = "async")] mod pty; #[cfg(feature = "async")] -pub use pty::Pty; +pub use pty::{OwnedReadPty, OwnedWritePty, Pts, Pty, ReadPty, WritePty}; @@ -1,8 +1,11 @@ +#![allow(clippy::module_name_repetitions)] + +use std::io::{Read as _, Write as _}; + +type AsyncPty = tokio::io::unix::AsyncFd<crate::sys::Pty>; + /// An allocated pty -pub struct Pty { - pt: async_io::Async<std::fs::File>, - pts: std::fs::File, -} +pub struct Pty(AsyncPty); impl Pty { /// Allocate and return a new pty. @@ -11,13 +14,9 @@ impl Pty { /// Returns an error if the pty failed to be allocated, or if we were /// unable to put it into non-blocking mode. pub fn new() -> crate::Result<Self> { - let (pt, ptsname) = crate::sys::create_pt()?; - let pt = async_io::Async::new(pt)?; - let pts = std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(&ptsname)?; - Ok(Self { pt, pts }) + let pty = crate::sys::Pty::open()?; + pty.set_nonblocking()?; + Ok(Self(tokio::io::unix::AsyncFd::new(pty)?)) } /// Change the terminal size associated with the pty. @@ -25,69 +24,318 @@ impl Pty { /// # Errors /// Returns an error if we were unable to set the terminal size. pub fn resize(&self, size: crate::Size) -> crate::Result<()> { - Ok(crate::sys::set_term_size(self, size)?) + self.0.get_ref().set_term_size(size) } - pub(crate) fn pts(&self) -> &std::fs::File { - &self.pts + /// Opens a file descriptor for the other end of the pty, which should be + /// attached to the child process running in it. See + /// [`Command::spawn`](crate::Command::spawn). + /// + /// # Errors + /// Returns an error if the device node to open could not be determined, + /// or if the device node could not be opened. + pub fn pts(&self) -> crate::Result<Pts> { + Ok(Pts(self.0.get_ref().pts()?)) } -} - -impl std::ops::Deref for Pty { - type Target = async_io::Async<std::fs::File>; - fn deref(&self) -> &Self::Target { - &self.pt + /// Splits a `Pty` into a read half and a write half, which can be used to + /// read from and write to the pty concurrently. Does not allocate, but + /// the returned halves cannot be moved to independent tasks. + pub fn split(&mut self) -> (ReadPty<'_>, WritePty<'_>) { + (ReadPty(&self.0), WritePty(&self.0)) } -} -impl std::ops::DerefMut for Pty { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.pt + /// Splits a `Pty` into a read half and a write half, which can be used to + /// read from and write to the pty concurrently. This method requires an + /// allocation, but the returned halves can be moved to independent tasks. + /// The original `Pty` instance can be recovered via the + /// [`OwnedReadPty::unsplit`] method. + #[must_use] + pub fn into_split(self) -> (OwnedReadPty, OwnedWritePty) { + let Self(pt) = self; + let read_pt = std::sync::Arc::new(pt); + let write_pt = std::sync::Arc::clone(&read_pt); + (OwnedReadPty(read_pt), OwnedWritePty(write_pt)) } } impl std::os::unix::io::AsRawFd for Pty { fn as_raw_fd(&self) -> std::os::unix::io::RawFd { - self.pt.as_raw_fd() + self.0.as_raw_fd() } } -// there is an AsyncRead impl for &Async<std::fs::File>, but without this -// explicit impl, rust finds the AsyncRead impl for Async<std::fs::File> -// first, and then complains that it requires &mut self, because method -// resolution/autoderef doesn't take mutability into account -impl futures_io::AsyncRead for &Pty { +impl tokio::io::AsyncRead for Pty { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll<Result<usize, std::io::Error>> { - std::pin::Pin::new(&mut &self.pt).poll_read(cx, buf) + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll<std::io::Result<()>> { + loop { + let mut guard = match self.0.poll_read_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + let mut b = [0u8; 4096]; + match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) { + Ok(Ok(bytes)) => { + // XXX this is safe, but not particularly efficient + buf.clear(); + buf.initialize_unfilled_to(bytes); + buf.set_filled(bytes); + buf.filled_mut().copy_from_slice(&b[..bytes]); + return std::task::Poll::Ready(Ok(())); + } + Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), + Err(_would_block) => continue, + } + } } } -// same as above -impl futures_io::AsyncWrite for &Pty { +impl tokio::io::AsyncWrite for Pty { fn poll_write( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], - ) -> std::task::Poll<Result<usize, std::io::Error>> { - std::pin::Pin::new(&mut &self.pt).poll_write(cx, buf) + ) -> std::task::Poll<std::io::Result<usize>> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } } fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).flush()) { + Ok(_) => return std::task::Poll::Ready(Ok(())), + Err(_would_block) => continue, + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), std::io::Error>> { - std::pin::Pin::new(&mut &self.pt).poll_flush(cx) + std::task::Poll::Ready(Ok(())) } +} + +/// The child end of the pty +/// +/// See [`Pty::pts`] and [`Command::spawn`](crate::Command::spawn) +pub struct Pts(pub(crate) crate::sys::Pts); + +/// Borrowed read half of a [`Pty`] +pub struct ReadPty<'a>(&'a AsyncPty); - fn poll_close( +impl<'a> tokio::io::AsyncRead for ReadPty<'a> { + fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll<std::io::Result<()>> { + loop { + let mut guard = match self.0.poll_read_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + let mut b = [0u8; 4096]; + match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) { + Ok(Ok(bytes)) => { + // XXX this is safe, but not particularly efficient + buf.clear(); + buf.initialize_unfilled_to(bytes); + buf.set_filled(bytes); + buf.filled_mut().copy_from_slice(&b[..bytes]); + return std::task::Poll::Ready(Ok(())); + } + Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), + Err(_would_block) => continue, + } + } + } +} + +/// Borrowed write half of a [`Pty`] +pub struct WritePty<'a>(&'a AsyncPty); + +impl<'a> WritePty<'a> { + /// Change the terminal size associated with the pty. + /// + /// # Errors + /// Returns an error if we were unable to set the terminal size. + pub fn resize(&self, size: crate::Size) -> crate::Result<()> { + self.0.get_ref().set_term_size(size) + } +} + +impl<'a> tokio::io::AsyncWrite for WritePty<'a> { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<std::io::Result<usize>> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).flush()) { + Ok(_) => return std::task::Poll::Ready(Ok(())), + Err(_would_block) => continue, + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + std::task::Poll::Ready(Ok(())) + } +} + +/// Owned read half of a [`Pty`] +#[derive(Debug)] +pub struct OwnedReadPty(std::sync::Arc<AsyncPty>); + +impl OwnedReadPty { + /// Attempt to join the two halves of a `Pty` back into a single instance. + /// The two halves must have originated from calling + /// [`into_split`](Pty::into_split) on a single instance. + /// + /// # Errors + /// Returns an error if the two halves came from different [`Pty`] + /// instances. The mismatched halves are returned as part of the error. + pub fn unsplit(self, write_half: OwnedWritePty) -> crate::Result<Pty> { + let Self(read_pt) = self; + let OwnedWritePty(write_pt) = write_half; + if std::sync::Arc::ptr_eq(&read_pt, &write_pt) { + drop(write_pt); + Ok(Pty(std::sync::Arc::try_unwrap(read_pt) + // it shouldn't be possible for more than two references to + // the same pty to exist + .unwrap_or_else(|_| unreachable!()))) + } else { + Err(crate::Error::Unsplit( + Self(read_pt), + OwnedWritePty(write_pt), + )) + } + } +} + +impl tokio::io::AsyncRead for OwnedReadPty { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll<std::io::Result<()>> { + loop { + let mut guard = match self.0.poll_read_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + let mut b = [0u8; 4096]; + match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) { + Ok(Ok(bytes)) => { + // XXX this is safe, but not particularly efficient + buf.clear(); + buf.initialize_unfilled_to(bytes); + buf.set_filled(bytes); + buf.filled_mut().copy_from_slice(&b[..bytes]); + return std::task::Poll::Ready(Ok(())); + } + Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), + Err(_would_block) => continue, + } + } + } +} + +/// Owned write half of a [`Pty`] +#[derive(Debug)] +pub struct OwnedWritePty(std::sync::Arc<AsyncPty>); + +impl OwnedWritePty { + /// Change the terminal size associated with the pty. + /// + /// # Errors + /// Returns an error if we were unable to set the terminal size. + pub fn resize(&self, size: crate::Size) -> crate::Result<()> { + self.0.get_ref().set_term_size(size) + } +} + +impl tokio::io::AsyncWrite for OwnedWritePty { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<std::io::Result<usize>> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).flush()) { + Ok(_) => return std::task::Poll::Ready(Ok(())), + Err(_would_block) => continue, + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), std::io::Error>> { - std::pin::Pin::new(&mut &self.pt).poll_close(cx) + std::task::Poll::Ready(Ok(())) } } @@ -1,83 +1,127 @@ -use std::os::unix::io::{FromRawFd as _, IntoRawFd as _}; +use std::os::unix::io::{AsRawFd as _, FromRawFd as _, IntoRawFd as _}; -pub fn create_pt() -> nix::Result<(std::fs::File, std::path::PathBuf)> { - let pt = nix::pty::posix_openpt( - nix::fcntl::OFlag::O_RDWR - | nix::fcntl::OFlag::O_NOCTTY - | nix::fcntl::OFlag::O_CLOEXEC, - )?; - nix::pty::grantpt(&pt)?; - nix::pty::unlockpt(&pt)?; +#[derive(Debug)] +pub struct Pty(pub nix::pty::PtyMaster); - let ptsname = nix::pty::ptsname_r(&pt)?.into(); +impl Pty { + pub fn open() -> crate::Result<Self> { + let pt = nix::pty::posix_openpt( + nix::fcntl::OFlag::O_RDWR + | nix::fcntl::OFlag::O_NOCTTY + | nix::fcntl::OFlag::O_CLOEXEC, + )?; + nix::pty::grantpt(&pt)?; + nix::pty::unlockpt(&pt)?; - let pt_fd = pt.into_raw_fd(); + Ok(Self(pt)) + } + + pub fn set_term_size(&self, size: crate::Size) -> crate::Result<()> { + let size = size.into(); + let fd = self.0.as_raw_fd(); - // Safety: posix_openpt (or the previous functions operating on the - // result) would have returned an Err (causing us to return early) if the - // file descriptor was invalid. additionally, into_raw_fd gives up - // ownership over the file descriptor, allowing the newly created File - // object to take full ownership. - let pt = unsafe { std::fs::File::from_raw_fd(pt_fd) }; + // Safety: nix::pty::PtyMaster is required to contain a valid file + // descriptor and size is guaranteed to be initialized because it's a + // normal rust value, and nix::pty::Winsize is a repr(C) struct with + // the same layout as `struct winsize` from sys/ioctl.h. + Ok(unsafe { + set_term_size_unsafe(fd, std::ptr::NonNull::from(&size).as_ptr()) + } + .map(|_| ())?) + } + + pub fn pts(&self) -> crate::Result<Pts> { + Ok(Pts(std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(nix::pty::ptsname_r(&self.0)?)? + .into_raw_fd())) + } + + #[cfg(feature = "async")] + pub fn set_nonblocking(&self) -> nix::Result<()> { + let bits = nix::fcntl::fcntl( + self.0.as_raw_fd(), + nix::fcntl::FcntlArg::F_GETFL, + )?; + // Safety: bits was just returned from a F_GETFL call. ideally i would + // just be able to use from_bits here, but it fails for some reason? + let mut opts = + unsafe { nix::fcntl::OFlag::from_bits_unchecked(bits) }; + opts |= nix::fcntl::OFlag::O_NONBLOCK; + nix::fcntl::fcntl( + self.0.as_raw_fd(), + nix::fcntl::FcntlArg::F_SETFL(opts), + )?; + + Ok(()) + } +} - Ok((pt, ptsname)) +impl std::os::unix::io::AsRawFd for Pty { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.0.as_raw_fd() + } } -pub fn set_term_size( - fh: &impl std::os::unix::io::AsRawFd, - size: crate::Size, -) -> nix::Result<()> { - let size = size.into(); - let fd = fh.as_raw_fd(); - - // Safety: std::fs::File is required to contain a valid file descriptor - // and size is guaranteed to be initialized because it's a normal rust - // value, and nix::pty::Winsize is a repr(C) struct with the same layout - // as `struct winsize` from sys/ioctl.h. - unsafe { - set_term_size_unsafe(fd, std::ptr::NonNull::from(&size).as_ptr()) +pub struct Pts(std::os::unix::io::RawFd); + +impl Pts { + pub fn setup_subprocess( + &self, + ) -> nix::Result<( + std::process::Stdio, + std::process::Stdio, + std::process::Stdio, + )> { + let pts_fd = self.0.as_raw_fd(); + + let stdin = nix::fcntl::fcntl( + pts_fd, + nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0), + )?; + let stdout = nix::fcntl::fcntl( + pts_fd, + nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0), + )?; + let stderr = nix::fcntl::fcntl( + pts_fd, + nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0), + )?; + + // Safety: these file descriptors were all just returned from dup, so + // they must be valid + Ok(( + unsafe { std::process::Stdio::from_raw_fd(stdin) }, + unsafe { std::process::Stdio::from_raw_fd(stdout) }, + unsafe { std::process::Stdio::from_raw_fd(stderr) }, + )) + } + + pub fn session_leader(&self) -> impl FnMut() -> std::io::Result<()> { + let pts_fd = self.0.as_raw_fd(); + move || { + nix::unistd::setsid()?; + set_controlling_terminal(pts_fd)?; + Ok(()) + } } - .map(|_| ()) } -pub fn setup_subprocess( - pts: &impl std::os::unix::io::AsRawFd, -) -> nix::Result<( - std::process::Stdio, - std::process::Stdio, - std::process::Stdio, -)> { - let pts_fd = pts.as_raw_fd(); - - let stdin = - nix::fcntl::fcntl(pts_fd, nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?; - let stdout = - nix::fcntl::fcntl(pts_fd, nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?; - let stderr = - nix::fcntl::fcntl(pts_fd, nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?; - - // Safety: these file descriptors were all just returned from dup, so they - // must be valid - Ok(( - unsafe { std::process::Stdio::from_raw_fd(stdin) }, - unsafe { std::process::Stdio::from_raw_fd(stdout) }, - unsafe { std::process::Stdio::from_raw_fd(stderr) }, - )) +impl Drop for Pts { + fn drop(&mut self) { + let _ = nix::unistd::close(self.0); + } } -pub fn session_leader( - pts: &impl std::os::unix::io::AsRawFd, -) -> impl FnMut() -> std::io::Result<()> { - let pts_fd = pts.as_raw_fd(); - move || { - nix::unistd::setsid()?; - set_controlling_terminal(pts_fd)?; - Ok(()) +impl std::os::unix::io::AsRawFd for Pts { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.0 } } fn set_controlling_terminal(fd: std::os::unix::io::RawFd) -> nix::Result<()> { - // Safety: std::fs::File is required to contain a valid file descriptor + // Safety: Pts is required to contain a valid file descriptor unsafe { set_controlling_terminal_unsafe(fd, std::ptr::null()) } .map(|_| ()) } diff --git a/tests/basic.rs b/tests/basic.rs index cab2c16..6f276e6 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -4,42 +4,50 @@ mod helpers; fn test_cat_blocking() { use std::io::Write as _; - let pty = pty_process::blocking::Pty::new().unwrap(); + let mut pty = pty_process::blocking::Pty::new().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("cat") - .spawn(&pty) + .spawn(&pty.pts().unwrap()) .unwrap(); - (&pty).write_all(b"foo\n").unwrap(); + pty.write_all(b"foo\n").unwrap(); let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "foo\r\n"); assert_eq!(output.next().unwrap(), "foo\r\n"); - (&pty).write_all(&[4u8]).unwrap(); + pty.write_all(&[4u8]).unwrap(); let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); } #[cfg(feature = "async")] #[test] -fn test_cat_async_std() { - use async_std::io::prelude::WriteExt as _; +fn test_cat_async() { use futures::stream::StreamExt as _; - - let status = async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("cat").spawn(&pty).unwrap(); - - (&pty).write_all(b"foo\n").await.unwrap(); - - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); - - (&pty).write_all(&[4u8]).await.unwrap(); - child.status().await.unwrap() - }); + use tokio::io::AsyncWriteExt as _; + + let status = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut child = + pty_process::Command::new("cat").spawn(&pts).unwrap(); + + let (pty_r, mut pty_w) = pty.split(); + + pty_w.write_all(b"foo\n").await.unwrap(); + + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + + pty_w.write_all(&[4u8]).await.unwrap(); + child.wait().await.unwrap() + }); assert_eq!(status.code().unwrap(), 0); } diff --git a/tests/behavior.rs b/tests/behavior.rs index d45dd14..7969f72 100644 --- a/tests/behavior.rs +++ b/tests/behavior.rs @@ -3,11 +3,12 @@ mod helpers; #[test] fn test_multiple() { let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("echo") .arg("foo") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -18,10 +19,9 @@ fn test_multiple() { let mut child = pty_process::blocking::Command::new("echo") .arg("bar") - .spawn(&pty) + .spawn(&pts) .unwrap(); - let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "bar\r\n"); let status = child.wait().unwrap(); @@ -33,32 +33,37 @@ fn test_multiple() { fn test_multiple_async() { use futures::stream::StreamExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("echo") - .arg("foo") - .spawn(&pty) - .unwrap(); + let mut child = pty_process::Command::new("echo") + .arg("foo") + .spawn(&pts) + .unwrap(); + let (pty_r, _) = pty.split(); - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); - let mut child = pty_process::Command::new("echo") - .arg("bar") - .spawn(&pty) - .unwrap(); + let mut child = pty_process::Command::new("echo") + .arg("bar") + .spawn(&pts) + .unwrap(); - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "bar\r\n"); + assert_eq!(output.next().await.unwrap(), "bar\r\n"); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); + }); } #[test] @@ -67,6 +72,7 @@ fn test_multiple_configured() { use std::os::unix::io::FromRawFd as _; let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let (stderr_pipe_r, stderr_pipe_w) = nix::unistd::pipe().unwrap(); @@ -90,7 +96,7 @@ fn test_multiple_configured() { Ok(()) }); } - let mut child = cmd.spawn(&pty).unwrap(); + let mut child = cmd.spawn(&pts).unwrap(); let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "foo\r\n"); @@ -110,7 +116,8 @@ fn test_multiple_configured() { let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); - let mut child = cmd.spawn(&pty).unwrap(); + let mut child = cmd.spawn(&pts).unwrap(); + let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "foo\r\n"); @@ -133,105 +140,126 @@ fn test_multiple_configured() { #[cfg(feature = "async")] #[test] fn test_multiple_configured_async() { - use async_std::io::prelude::BufReadExt as _; use futures::stream::StreamExt as _; use std::os::unix::io::FromRawFd as _; + use tokio::io::AsyncBufReadExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - - let (stderr_pipe_r, stderr_pipe_w) = nix::unistd::pipe().unwrap(); - let mut stderr_pipe_r = async_std::io::BufReader::new(unsafe { - async_std::fs::File::from_raw_fd(stderr_pipe_r) - }); - let (pre_exec_pipe_r, pre_exec_pipe_w) = nix::unistd::pipe().unwrap(); - let mut pre_exec_pipe_r = async_std::io::BufReader::new(unsafe { - async_std::fs::File::from_raw_fd(pre_exec_pipe_r) - }); - let mut cmd = pty_process::Command::new("perl"); - cmd.arg("-Esay 'foo'; say STDERR 'foo-stderr'; open my $fh, '>&=3'; say $fh 'foo-3';") - .stderr(unsafe { std::process::Stdio::from_raw_fd(stderr_pipe_w) }); - unsafe { - cmd.pre_exec(move || { - nix::unistd::dup2(pre_exec_pipe_w, 3)?; - nix::fcntl::fcntl( - 3, - nix::fcntl::F_SETFD(nix::fcntl::FdFlag::empty()), - )?; - Ok(()) - }); - } - let mut child = cmd.spawn(&pty).unwrap(); - - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); - - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - stderr_pipe_r.read_until(b'\n', &mut buf), - ) - .await - .unwrap() - .unwrap(); - assert_eq!( - std::string::String::from_utf8(buf).unwrap(), - "foo-stderr\n" - ); - - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - pre_exec_pipe_r.read_until(b'\n', &mut buf), - ) - .await + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() .unwrap() - .unwrap(); - assert_eq!(std::string::String::from_utf8(buf).unwrap(), "foo-3\n"); + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let (pty_r, _) = pty.split(); + + let (stderr_pipe_r, stderr_pipe_w) = nix::unistd::pipe().unwrap(); + let mut stderr_pipe_r = tokio::io::BufReader::new(unsafe { + tokio::fs::File::from_raw_fd(stderr_pipe_r) + }); + let (pre_exec_pipe_r, pre_exec_pipe_w) = + nix::unistd::pipe().unwrap(); + let mut pre_exec_pipe_r = tokio::io::BufReader::new(unsafe { + tokio::fs::File::from_raw_fd(pre_exec_pipe_r) + }); + let mut cmd = pty_process::Command::new("perl"); + cmd.arg( + "-Esay 'foo'; \ + say STDERR 'foo-stderr'; \ + open my $fh, '>&=3'; \ + say $fh 'foo-3';", + ) + .stderr(unsafe { + std::process::Stdio::from_raw_fd(stderr_pipe_w) + }); + unsafe { + cmd.pre_exec(move || { + nix::unistd::dup2(pre_exec_pipe_w, 3)?; + nix::fcntl::fcntl( + 3, + nix::fcntl::F_SETFD(nix::fcntl::FdFlag::empty()), + )?; + Ok(()) + }); + } + let mut child = cmd.spawn(&pts).unwrap(); + + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + stderr_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-stderr\n" + ); + + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + pre_exec_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-3\n" + ); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); - let mut child = cmd.spawn(&pty).unwrap(); + let mut child = cmd.spawn(&pts).unwrap(); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - stderr_pipe_r.read_until(b'\n', &mut buf), - ) - .await - .unwrap() - .unwrap(); - assert_eq!( - std::string::String::from_utf8(buf).unwrap(), - "foo-stderr\n" - ); - - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - pre_exec_pipe_r.read_until(b'\n', &mut buf), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(std::string::String::from_utf8(buf).unwrap(), "foo-3\n"); + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + stderr_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-stderr\n" + ); + + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + pre_exec_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-3\n" + ); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); + }); } #[test] fn test_controlling_terminal() { let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .arg("-Eopen my $fh, '<', '/dev/tty' or die; if (-t $fh) { say 'true' } else { say 'false' }") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -246,29 +274,39 @@ fn test_controlling_terminal() { fn test_controlling_terminal_async() { use futures::stream::StreamExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("perl") - .arg("-Eopen my $fh, '<', '/dev/tty' or die; if (-t $fh) { say 'true' } else { say 'false' }") - .spawn(&pty) - .unwrap(); - - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "true\r\n"); - - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let (pty_r, _) = pty.split(); + let mut child = pty_process::Command::new("perl") + .arg( + "-Eopen my $fh, '<', '/dev/tty' or die; \ + if (-t $fh) { say 'true' } else { say 'false' }", + ) + .spawn(&pts) + .unwrap(); + + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "true\r\n"); + + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); + }); } #[test] fn test_session_leader() { let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("python") .arg("-cimport os; print(os.getpid() == os.getsid(0))") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -283,20 +321,25 @@ fn test_session_leader() { fn test_session_leader_async() { use futures::stream::StreamExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("python") - .arg("-cimport os; print(os.getpid() == os.getsid(0))") - .spawn(&pty) - .unwrap(); - - { - let mut output = helpers::output_async(&pty); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut child = pty_process::Command::new("python") + .arg("-cimport os; print(os.getpid() == os.getsid(0))") + .spawn(&pts) + .unwrap(); + + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "True\r\n"); - } - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + let status = child.wait().await.unwrap(); + eprintln!("{:?}", status); + assert_eq!(status.code().unwrap(), 0); + }); } diff --git a/tests/fds.rs b/tests/fds.rs index 8b64d3f..a641f46 100644 --- a/tests/fds.rs +++ b/tests/fds.rs @@ -5,10 +5,11 @@ fn test_fds() { check_open_fds(); let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -17,14 +18,16 @@ fn test_fds() { let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); drop(pty); + drop(pts); check_open_fds(); let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") .stderr(std::process::Stdio::null()) - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -33,6 +36,7 @@ fn test_fds() { let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); drop(pty); + drop(pts); check_open_fds(); } diff --git a/tests/fds_async.rs b/tests/fds_async.rs index 9e0bc15..6f3bcde 100644 --- a/tests/fds_async.rs +++ b/tests/fds_async.rs @@ -7,61 +7,86 @@ fn test_fds_async() { check_open_fds(&[0, 1, 2]); - // run once to ensure all of the fds in the async_std machinery are + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + // run once to ensure all of the fds in the tokio machinery are // allocated - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); + rt.block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::Command::new("perl") - .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") - .spawn(&pty) + .arg( + "-Efor my $fd (0..255) { \ + open my $fh, \"<&=$fd\"; \ + print $fd if stat $fh \ + }; \ + say", + ) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "012\r\n"); - let status = child.status().await.unwrap(); + let status = child.wait().await.unwrap(); assert_eq!(status.code().unwrap(), 0); }); - async_std::task::block_on(async { + rt.block_on(async { let fds = get_open_fds(); - let pty = pty_process::Pty::new().unwrap(); + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::Command::new("perl") - .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") - .spawn(&pty) + .arg( + "-Efor my $fd (0..255) { \ + open my $fh, \"<&=$fd\"; \ + print $fd if stat $fh \ + }; \ + say", + ) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "012\r\n"); - let status = child.status().await.unwrap(); + let status = child.wait().await.unwrap(); assert_eq!(status.code().unwrap(), 0); drop(output); + drop(pts); drop(pty); check_open_fds(&fds); }); - async_std::task::block_on(async { + rt.block_on(async { let fds = get_open_fds(); - let pty = pty_process::Pty::new().unwrap(); + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::Command::new("perl") .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") .stderr(std::process::Stdio::null()) - .spawn(&pty) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "012\r\n"); - let status = child.status().await.unwrap(); + let status = child.wait().await.unwrap(); assert_eq!(status.code().unwrap(), 0); drop(output); + drop(pts); drop(pty); check_open_fds(&fds); diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index 4fee8df..a46a12b 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -31,23 +31,20 @@ pub fn output(pty: &pty_process::blocking::Pty) -> Output<'_> { } #[cfg(feature = "async")] -pub fn output_async( - pty: &pty_process::Pty, -) -> std::pin::Pin<Box<dyn futures::stream::Stream<Item = String> + '_>> { - use async_std::io::prelude::BufReadExt as _; +pub fn output_async<'a>( + pty: impl tokio::io::AsyncRead + std::marker::Unpin + 'a, +) -> std::pin::Pin<Box<dyn futures::stream::Stream<Item = String> + 'a>> { use futures::FutureExt as _; + use tokio::io::AsyncBufReadExt as _; - let pty = async_std::io::BufReader::new(pty); + let pty = tokio::io::BufReader::new(pty); Box::pin(futures::stream::unfold(pty, |mut pty| async move { Some(( - async_std::future::timeout( - std::time::Duration::from_secs(5), - async { - let mut buf = vec![]; - pty.read_until(b'\n', &mut buf).await.unwrap(); - std::string::String::from_utf8(buf).unwrap() - }, - ) + tokio::time::timeout(std::time::Duration::from_secs(5), async { + let mut buf = vec![]; + pty.read_until(b'\n', &mut buf).await.unwrap(); + std::string::String::from_utf8(buf).unwrap() + }) .map(|x| x.unwrap()) .await, pty, diff --git a/tests/pipe.rs b/tests/pipe.rs index 1bf49f2..e09334b 100644 --- a/tests/pipe.rs +++ b/tests/pipe.rs @@ -2,7 +2,8 @@ fn test_pipe_basic() { use std::os::unix::io::FromRawFd as _; - let (read_fd, write_fd) = nix::unistd::pipe().unwrap(); + let (read_fd, write_fd) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); let mut child_from = std::process::Command::new("seq"); child_from.args(["1", "10"]); @@ -13,56 +14,96 @@ fn test_pipe_basic() { child_to.stdout(std::process::Stdio::piped()); assert!(child_from.status().unwrap().success()); - nix::unistd::close(write_fd).unwrap(); + drop(child_from); let output = child_to.output().unwrap(); assert!(output.status.success()); assert_eq!(output.stdout, b"10\n9\n8\n7\n6\n5\n4\n3\n2\n1\n"); } -#[cfg(feature = "todo")] -// TODO (hangs because i'm still overriding the configured fds) -// #[test] +#[test] +fn test_pipe_blocking() { + use std::io::Read as _; + use std::os::unix::io::FromRawFd as _; + + let (read_fd, write_fd) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); + + let pty_from = pty_process::blocking::Pty::new().unwrap(); + let pts_from = pty_from.pts().unwrap(); + pty_from.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd_from = pty_process::blocking::Command::new("seq"); + cmd_from.args(["1", "10"]); + cmd_from.stdout(unsafe { std::process::Stdio::from_raw_fd(write_fd) }); + let mut child_from = cmd_from.spawn(&pts_from).unwrap(); + + let mut pty_to = pty_process::blocking::Pty::new().unwrap(); + let pts_to = pty_to.pts().unwrap(); + let mut cmd_to = pty_process::blocking::Command::new("tac"); + cmd_to.stdin(unsafe { std::process::Stdio::from_raw_fd(read_fd) }); + let mut child_to = cmd_to.spawn(&pts_to).unwrap(); + + assert!(child_from.wait().unwrap().success()); + drop(cmd_from); + + // wait for the `tac` process to finish generating output (we don't really + // have a good way to detect when that happens) + std::thread::sleep(std::time::Duration::from_millis(100)); + + let mut buf = [0u8; 1024]; + let bytes = pty_to.read(&mut buf).unwrap(); + assert_eq!( + &buf[..bytes], + b"10\r\n9\r\n8\r\n7\r\n6\r\n5\r\n4\r\n3\r\n2\r\n1\r\n" + ); + + assert!(child_to.wait().unwrap().success()); +} + +#[cfg(feature = "async")] +#[test] fn test_pipe_async() { - use async_std::io::ReadExt as _; use std::os::unix::io::FromRawFd as _; + use tokio::io::AsyncReadExt as _; + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let (read_fd, write_fd) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); + + let pty_from = pty_process::Pty::new().unwrap(); + let pts_from = pty_from.pts().unwrap(); + pty_from.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd_from = pty_process::Command::new("seq"); + cmd_from.args(["1", "10"]); + cmd_from.stdout(unsafe { + std::process::Stdio::from_raw_fd(write_fd) + }); + let mut child_from = cmd_from.spawn(&pts_from).unwrap(); + + let mut pty_to = pty_process::Pty::new().unwrap(); + let pts_to = pty_to.pts().unwrap(); + let mut cmd_to = pty_process::Command::new("tac"); + cmd_to + .stdin(unsafe { std::process::Stdio::from_raw_fd(read_fd) }); + let mut child_to = cmd_to.spawn(&pts_to).unwrap(); + + assert!(child_from.wait().await.unwrap().success()); + drop(cmd_from); + + // wait for the `tac` process to finish generating output (we + // don't really have a good way to detect when that happens) + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let mut buf = [0u8; 1024]; + let bytes = pty_to.read(&mut buf).await.unwrap(); + assert_eq!( + &buf[..bytes], + b"10\r\n9\r\n8\r\n7\r\n6\r\n5\r\n4\r\n3\r\n2\r\n1\r\n" + ); - let (status_from, status_to) = async_std::task::block_on(async { - let (read_fd, write_fd) = nix::unistd::pipe().unwrap(); - - let pty_from = pty_process::async_std::Pty::new().unwrap(); - pty_from.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut cmd_from = pty_process::async_std::Command::new("seq"); - cmd_from.args(["1", "10"]); - cmd_from - .stdout(unsafe { std::process::Stdio::from_raw_fd(write_fd) }); - let mut child_from = cmd_from.spawn(pty_from).unwrap(); - - let pty_to = pty_process::async_std::Pty::new().unwrap(); - pty_to.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut cmd_to = pty_process::async_std::Command::new("tac"); - cmd_to.stdin(unsafe { std::process::Stdio::from_raw_fd(read_fd) }); - let mut child_to = cmd_to.spawn(pty_to).unwrap(); - - // the pty will echo the written bytes back immediately, but the - // subprocess needs to generate its own output, which takes time, so - // we can't just read immediately (we may just get the echoed bytes). - // because the output generation is happening in the subprocess, we - // also don't have any way to know when (or if!) the subprocess will - // decide to send its output, so sleeping is the best we can do. - async_std::task::sleep(std::time::Duration::from_secs(1)).await; - - let mut buf = [0u8; 1024]; - let bytes = child_to.pty().read(&mut buf).await.unwrap(); - assert_eq!( - &buf[..bytes], - b"10\r\n9\r\n8\r\n7\r\n6\r\n5\r\n4\r\n3\r\n2\r\n1\r\n" - ); - - ( - child_from.status().await.unwrap(), - child_to.status().await.unwrap(), - ) - }); - assert_eq!(status_from.code().unwrap(), 0); - assert_eq!(status_to.code().unwrap(), 0); + assert!(child_to.wait().await.unwrap().success()); + }); } diff --git a/tests/split.rs b/tests/split.rs new file mode 100644 index 0000000..f537f17 --- /dev/null +++ b/tests/split.rs @@ -0,0 +1,155 @@ +mod helpers; + +#[cfg(feature = "async")] +#[test] +fn test_split() { + use futures::stream::StreamExt as _; + use tokio::io::AsyncWriteExt as _; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd = pty_process::Command::new("perl"); + cmd.args(["-plE", "BEGIN { $SIG{WINCH} = sub { say 'WINCH' } }"]); + let mut child = cmd.spawn(&pts).unwrap(); + + { + pty.write_all(b"foo\n").await.unwrap(); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + } + + { + let (pty_r, mut pty_w) = pty.split(); + pty_w.write_all(b"foo\n").await.unwrap(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + } + + { + let (pty_r, pty_w) = pty.split(); + pty_w.resize(pty_process::Size::new(25, 80)).unwrap(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "WINCH\r\n"); + } + + pty.write_all(&[4u8]).await.unwrap(); + child.wait().await.unwrap() + }); +} + +#[cfg(feature = "async")] +#[test] +fn test_into_split() { + use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _}; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd = pty_process::Command::new("perl"); + cmd.args(["-plE", "BEGIN { $SIG{WINCH} = sub { say 'WINCH' } }"]); + let mut child = cmd.spawn(&pts).unwrap(); + + { + pty.write_all(b"foo\n").await.unwrap(); + let (pty_r, pty_w) = pty.into_split(); + let mut ptybuf = tokio::io::BufReader::new(pty_r); + for _ in 0..2 { + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + ptybuf.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(&buf[..], b"foo\r\n"); + } + pty = ptybuf.into_inner().unsplit(pty_w).unwrap(); + } + + { + let (pty_r, mut pty_w) = pty.into_split(); + pty_w.write_all(b"foo\n").await.unwrap(); + let mut ptybuf = tokio::io::BufReader::new(pty_r); + for _ in 0..2 { + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + ptybuf.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(&buf[..], b"foo\r\n"); + } + pty = ptybuf.into_inner().unsplit(pty_w).unwrap(); + } + + { + let (pty_r, pty_w) = pty.into_split(); + pty_w.resize(pty_process::Size::new(25, 80)).unwrap(); + let mut ptybuf = tokio::io::BufReader::new(pty_r); + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + ptybuf.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(&buf[..], b"WINCH\r\n"); + pty = ptybuf.into_inner().unsplit(pty_w).unwrap(); + } + + pty.write_all(&[4u8]).await.unwrap(); + child.wait().await.unwrap() + }); +} + +#[cfg(feature = "async")] +#[test] +fn test_into_split_error() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let pty1 = pty_process::Pty::new().unwrap(); + let pty2 = pty_process::Pty::new().unwrap(); + + let (pty1_r, pty1_w) = pty1.into_split(); + let (pty2_r, pty2_w) = pty2.into_split(); + + let (pty1_r, pty2_w) = if let Err(pty_process::Error::Unsplit(r, w)) = + pty1_r.unsplit(pty2_w) + { + (r, w) + } else { + panic!("fail"); + }; + let (pty2_r, pty1_w) = if let Err(pty_process::Error::Unsplit(r, w)) = + pty2_r.unsplit(pty1_w) + { + (r, w) + } else { + panic!("fail"); + }; + + let _pty1 = pty1_r.unsplit(pty1_w).unwrap(); + let _pty2 = pty2_r.unsplit(pty2_w).unwrap(); + }); +} diff --git a/tests/winch.rs b/tests/winch.rs index 50447cb..6b16ca9 100644 --- a/tests/winch.rs +++ b/tests/winch.rs @@ -4,14 +4,15 @@ mod helpers; fn test_winch_std() { use std::io::Write as _; - let pty = pty_process::blocking::Pty::new().unwrap(); + let mut pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .args(&[ "-E", "$|++; $SIG{WINCH} = sub { say 'WINCH' }; say 'started'; <>", ]) - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -20,7 +21,7 @@ fn test_winch_std() { pty.resize(pty_process::Size::new(25, 80)).unwrap(); assert_eq!(output.next().unwrap(), "WINCH\r\n"); - (&pty).write_all(b"\n").unwrap(); + pty.write_all(b"\n").unwrap(); let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); } @@ -28,28 +29,34 @@ fn test_winch_std() { #[cfg(feature = "async")] #[test] fn test_winch_async() { - use async_std::io::prelude::WriteExt as _; use futures::stream::StreamExt as _; - - let status = async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("perl") + use tokio::io::AsyncWriteExt as _; + + let status = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut child = pty_process::Command::new("perl") .args(&[ "-E", "$|++; $SIG{WINCH} = sub { say 'WINCH' }; say 'started'; <>", ]) - .spawn(&pty) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "started\r\n"); + let (pty_r, mut pty_w) = pty.split(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "started\r\n"); - pty.resize(pty_process::Size::new(25, 80)).unwrap(); - assert_eq!(output.next().await.unwrap(), "WINCH\r\n"); + pty_w.resize(pty_process::Size::new(25, 80)).unwrap(); + assert_eq!(output.next().await.unwrap(), "WINCH\r\n"); - (&pty).write_all(b"\n").await.unwrap(); - child.status().await.unwrap() - }); + pty_w.write_all(b"\n").await.unwrap(); + child.wait().await.unwrap() + }); assert_eq!(status.code().unwrap(), 0); } |