aboutsummaryrefslogtreecommitdiffstats
path: root/src/pty.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/pty.rs')
-rw-r--r--src/pty.rs328
1 files changed, 288 insertions, 40 deletions
diff --git a/src/pty.rs b/src/pty.rs
index 1d7f113..87fb311 100644
--- a/src/pty.rs
+++ b/src/pty.rs
@@ -1,8 +1,11 @@
+#![allow(clippy::module_name_repetitions)]
+
+use std::io::{Read as _, Write as _};
+
+type AsyncPty = tokio::io::unix::AsyncFd<crate::sys::Pty>;
+
/// An allocated pty
-pub struct Pty {
- pt: async_io::Async<std::fs::File>,
- pts: std::fs::File,
-}
+pub struct Pty(AsyncPty);
impl Pty {
/// Allocate and return a new pty.
@@ -11,13 +14,9 @@ impl Pty {
/// Returns an error if the pty failed to be allocated, or if we were
/// unable to put it into non-blocking mode.
pub fn new() -> crate::Result<Self> {
- let (pt, ptsname) = crate::sys::create_pt()?;
- let pt = async_io::Async::new(pt)?;
- let pts = std::fs::OpenOptions::new()
- .read(true)
- .write(true)
- .open(&ptsname)?;
- Ok(Self { pt, pts })
+ let pty = crate::sys::Pty::open()?;
+ pty.set_nonblocking()?;
+ Ok(Self(tokio::io::unix::AsyncFd::new(pty)?))
}
/// Change the terminal size associated with the pty.
@@ -25,69 +24,318 @@ impl Pty {
/// # Errors
/// Returns an error if we were unable to set the terminal size.
pub fn resize(&self, size: crate::Size) -> crate::Result<()> {
- Ok(crate::sys::set_term_size(self, size)?)
+ self.0.get_ref().set_term_size(size)
}
- pub(crate) fn pts(&self) -> &std::fs::File {
- &self.pts
+ /// Opens a file descriptor for the other end of the pty, which should be
+ /// attached to the child process running in it. See
+ /// [`Command::spawn`](crate::Command::spawn).
+ ///
+ /// # Errors
+ /// Returns an error if the device node to open could not be determined,
+ /// or if the device node could not be opened.
+ pub fn pts(&self) -> crate::Result<Pts> {
+ Ok(Pts(self.0.get_ref().pts()?))
}
-}
-
-impl std::ops::Deref for Pty {
- type Target = async_io::Async<std::fs::File>;
- fn deref(&self) -> &Self::Target {
- &self.pt
+ /// Splits a `Pty` into a read half and a write half, which can be used to
+ /// read from and write to the pty concurrently. Does not allocate, but
+ /// the returned halves cannot be moved to independent tasks.
+ pub fn split(&mut self) -> (ReadPty<'_>, WritePty<'_>) {
+ (ReadPty(&self.0), WritePty(&self.0))
}
-}
-impl std::ops::DerefMut for Pty {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.pt
+ /// Splits a `Pty` into a read half and a write half, which can be used to
+ /// read from and write to the pty concurrently. This method requires an
+ /// allocation, but the returned halves can be moved to independent tasks.
+ /// The original `Pty` instance can be recovered via the
+ /// [`OwnedReadPty::unsplit`] method.
+ #[must_use]
+ pub fn into_split(self) -> (OwnedReadPty, OwnedWritePty) {
+ let Self(pt) = self;
+ let read_pt = std::sync::Arc::new(pt);
+ let write_pt = std::sync::Arc::clone(&read_pt);
+ (OwnedReadPty(read_pt), OwnedWritePty(write_pt))
}
}
impl std::os::unix::io::AsRawFd for Pty {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
- self.pt.as_raw_fd()
+ self.0.as_raw_fd()
}
}
-// there is an AsyncRead impl for &Async<std::fs::File>, but without this
-// explicit impl, rust finds the AsyncRead impl for Async<std::fs::File>
-// first, and then complains that it requires &mut self, because method
-// resolution/autoderef doesn't take mutability into account
-impl futures_io::AsyncRead for &Pty {
+impl tokio::io::AsyncRead for Pty {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
- buf: &mut [u8],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- std::pin::Pin::new(&mut &self.pt).poll_read(cx, buf)
+ buf: &mut tokio::io::ReadBuf,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ loop {
+ let mut guard = match self.0.poll_read_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ let mut b = [0u8; 4096];
+ match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) {
+ Ok(Ok(bytes)) => {
+ // XXX this is safe, but not particularly efficient
+ buf.clear();
+ buf.initialize_unfilled_to(bytes);
+ buf.set_filled(bytes);
+ buf.filled_mut().copy_from_slice(&b[..bytes]);
+ return std::task::Poll::Ready(Ok(()));
+ }
+ Ok(Err(e)) => return std::task::Poll::Ready(Err(e)),
+ Err(_would_block) => continue,
+ }
+ }
}
}
-// same as above
-impl futures_io::AsyncWrite for &Pty {
+impl tokio::io::AsyncWrite for Pty {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- std::pin::Pin::new(&mut &self.pt).poll_write(cx, buf)
+ ) -> std::task::Poll<std::io::Result<usize>> {
+ loop {
+ let mut guard = match self.0.poll_write_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) {
+ Ok(result) => return std::task::Poll::Ready(result),
+ Err(_would_block) => continue,
+ }
+ }
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ loop {
+ let mut guard = match self.0.poll_write_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ match guard.try_io(|inner| (&inner.get_ref().0).flush()) {
+ Ok(_) => return std::task::Poll::Ready(Ok(())),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+
+ fn poll_shutdown(
+ self: std::pin::Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
- std::pin::Pin::new(&mut &self.pt).poll_flush(cx)
+ std::task::Poll::Ready(Ok(()))
}
+}
+
+/// The child end of the pty
+///
+/// See [`Pty::pts`] and [`Command::spawn`](crate::Command::spawn)
+pub struct Pts(pub(crate) crate::sys::Pts);
+
+/// Borrowed read half of a [`Pty`]
+pub struct ReadPty<'a>(&'a AsyncPty);
- fn poll_close(
+impl<'a> tokio::io::AsyncRead for ReadPty<'a> {
+ fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ loop {
+ let mut guard = match self.0.poll_read_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ let mut b = [0u8; 4096];
+ match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) {
+ Ok(Ok(bytes)) => {
+ // XXX this is safe, but not particularly efficient
+ buf.clear();
+ buf.initialize_unfilled_to(bytes);
+ buf.set_filled(bytes);
+ buf.filled_mut().copy_from_slice(&b[..bytes]);
+ return std::task::Poll::Ready(Ok(()));
+ }
+ Ok(Err(e)) => return std::task::Poll::Ready(Err(e)),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+}
+
+/// Borrowed write half of a [`Pty`]
+pub struct WritePty<'a>(&'a AsyncPty);
+
+impl<'a> WritePty<'a> {
+ /// Change the terminal size associated with the pty.
+ ///
+ /// # Errors
+ /// Returns an error if we were unable to set the terminal size.
+ pub fn resize(&self, size: crate::Size) -> crate::Result<()> {
+ self.0.get_ref().set_term_size(size)
+ }
+}
+
+impl<'a> tokio::io::AsyncWrite for WritePty<'a> {
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<std::io::Result<usize>> {
+ loop {
+ let mut guard = match self.0.poll_write_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) {
+ Ok(result) => return std::task::Poll::Ready(result),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ loop {
+ let mut guard = match self.0.poll_write_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ match guard.try_io(|inner| (&inner.get_ref().0).flush()) {
+ Ok(_) => return std::task::Poll::Ready(Ok(())),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+
+ fn poll_shutdown(
+ self: std::pin::Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ std::task::Poll::Ready(Ok(()))
+ }
+}
+
+/// Owned read half of a [`Pty`]
+#[derive(Debug)]
+pub struct OwnedReadPty(std::sync::Arc<AsyncPty>);
+
+impl OwnedReadPty {
+ /// Attempt to join the two halves of a `Pty` back into a single instance.
+ /// The two halves must have originated from calling
+ /// [`into_split`](Pty::into_split) on a single instance.
+ ///
+ /// # Errors
+ /// Returns an error if the two halves came from different [`Pty`]
+ /// instances. The mismatched halves are returned as part of the error.
+ pub fn unsplit(self, write_half: OwnedWritePty) -> crate::Result<Pty> {
+ let Self(read_pt) = self;
+ let OwnedWritePty(write_pt) = write_half;
+ if std::sync::Arc::ptr_eq(&read_pt, &write_pt) {
+ drop(write_pt);
+ Ok(Pty(std::sync::Arc::try_unwrap(read_pt)
+ // it shouldn't be possible for more than two references to
+ // the same pty to exist
+ .unwrap_or_else(|_| unreachable!())))
+ } else {
+ Err(crate::Error::Unsplit(
+ Self(read_pt),
+ OwnedWritePty(write_pt),
+ ))
+ }
+ }
+}
+
+impl tokio::io::AsyncRead for OwnedReadPty {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ loop {
+ let mut guard = match self.0.poll_read_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ let mut b = [0u8; 4096];
+ match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) {
+ Ok(Ok(bytes)) => {
+ // XXX this is safe, but not particularly efficient
+ buf.clear();
+ buf.initialize_unfilled_to(bytes);
+ buf.set_filled(bytes);
+ buf.filled_mut().copy_from_slice(&b[..bytes]);
+ return std::task::Poll::Ready(Ok(()));
+ }
+ Ok(Err(e)) => return std::task::Poll::Ready(Err(e)),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+}
+
+/// Owned write half of a [`Pty`]
+#[derive(Debug)]
+pub struct OwnedWritePty(std::sync::Arc<AsyncPty>);
+
+impl OwnedWritePty {
+ /// Change the terminal size associated with the pty.
+ ///
+ /// # Errors
+ /// Returns an error if we were unable to set the terminal size.
+ pub fn resize(&self, size: crate::Size) -> crate::Result<()> {
+ self.0.get_ref().set_term_size(size)
+ }
+}
+
+impl tokio::io::AsyncWrite for OwnedWritePty {
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<std::io::Result<usize>> {
+ loop {
+ let mut guard = match self.0.poll_write_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) {
+ Ok(result) => return std::task::Poll::Ready(result),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ loop {
+ let mut guard = match self.0.poll_write_ready(cx) {
+ std::task::Poll::Ready(guard) => guard,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }?;
+ match guard.try_io(|inner| (&inner.get_ref().0).flush()) {
+ Ok(_) => return std::task::Poll::Ready(Ok(())),
+ Err(_would_block) => continue,
+ }
+ }
+ }
+
+ fn poll_shutdown(
+ self: std::pin::Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
- std::pin::Pin::new(&mut &self.pt).poll_close(cx)
+ std::task::Poll::Ready(Ok(()))
}
}