From 39287b07f87aba15c4cb0f64d7008ba67289151d Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Tue, 22 Feb 2022 17:02:12 -0500 Subject: another rewrite --- src/pty.rs | 328 +++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 288 insertions(+), 40 deletions(-) (limited to 'src/pty.rs') diff --git a/src/pty.rs b/src/pty.rs index 1d7f113..87fb311 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -1,8 +1,11 @@ +#![allow(clippy::module_name_repetitions)] + +use std::io::{Read as _, Write as _}; + +type AsyncPty = tokio::io::unix::AsyncFd; + /// An allocated pty -pub struct Pty { - pt: async_io::Async, - pts: std::fs::File, -} +pub struct Pty(AsyncPty); impl Pty { /// Allocate and return a new pty. @@ -11,13 +14,9 @@ impl Pty { /// Returns an error if the pty failed to be allocated, or if we were /// unable to put it into non-blocking mode. pub fn new() -> crate::Result { - let (pt, ptsname) = crate::sys::create_pt()?; - let pt = async_io::Async::new(pt)?; - let pts = std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(&ptsname)?; - Ok(Self { pt, pts }) + let pty = crate::sys::Pty::open()?; + pty.set_nonblocking()?; + Ok(Self(tokio::io::unix::AsyncFd::new(pty)?)) } /// Change the terminal size associated with the pty. @@ -25,69 +24,318 @@ impl Pty { /// # Errors /// Returns an error if we were unable to set the terminal size. pub fn resize(&self, size: crate::Size) -> crate::Result<()> { - Ok(crate::sys::set_term_size(self, size)?) + self.0.get_ref().set_term_size(size) } - pub(crate) fn pts(&self) -> &std::fs::File { - &self.pts + /// Opens a file descriptor for the other end of the pty, which should be + /// attached to the child process running in it. See + /// [`Command::spawn`](crate::Command::spawn). + /// + /// # Errors + /// Returns an error if the device node to open could not be determined, + /// or if the device node could not be opened. + pub fn pts(&self) -> crate::Result { + Ok(Pts(self.0.get_ref().pts()?)) } -} - -impl std::ops::Deref for Pty { - type Target = async_io::Async; - fn deref(&self) -> &Self::Target { - &self.pt + /// Splits a `Pty` into a read half and a write half, which can be used to + /// read from and write to the pty concurrently. Does not allocate, but + /// the returned halves cannot be moved to independent tasks. + pub fn split(&mut self) -> (ReadPty<'_>, WritePty<'_>) { + (ReadPty(&self.0), WritePty(&self.0)) } -} -impl std::ops::DerefMut for Pty { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.pt + /// Splits a `Pty` into a read half and a write half, which can be used to + /// read from and write to the pty concurrently. This method requires an + /// allocation, but the returned halves can be moved to independent tasks. + /// The original `Pty` instance can be recovered via the + /// [`OwnedReadPty::unsplit`] method. + #[must_use] + pub fn into_split(self) -> (OwnedReadPty, OwnedWritePty) { + let Self(pt) = self; + let read_pt = std::sync::Arc::new(pt); + let write_pt = std::sync::Arc::clone(&read_pt); + (OwnedReadPty(read_pt), OwnedWritePty(write_pt)) } } impl std::os::unix::io::AsRawFd for Pty { fn as_raw_fd(&self) -> std::os::unix::io::RawFd { - self.pt.as_raw_fd() + self.0.as_raw_fd() } } -// there is an AsyncRead impl for &Async, but without this -// explicit impl, rust finds the AsyncRead impl for Async -// first, and then complains that it requires &mut self, because method -// resolution/autoderef doesn't take mutability into account -impl futures_io::AsyncRead for &Pty { +impl tokio::io::AsyncRead for Pty { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll> { - std::pin::Pin::new(&mut &self.pt).poll_read(cx, buf) + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_read_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + let mut b = [0u8; 4096]; + match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) { + Ok(Ok(bytes)) => { + // XXX this is safe, but not particularly efficient + buf.clear(); + buf.initialize_unfilled_to(bytes); + buf.set_filled(bytes); + buf.filled_mut().copy_from_slice(&b[..bytes]); + return std::task::Poll::Ready(Ok(())); + } + Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), + Err(_would_block) => continue, + } + } } } -// same as above -impl futures_io::AsyncWrite for &Pty { +impl tokio::io::AsyncWrite for Pty { fn poll_write( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], - ) -> std::task::Poll> { - std::pin::Pin::new(&mut &self.pt).poll_write(cx, buf) + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } } fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).flush()) { + Ok(_) => return std::task::Poll::Ready(Ok(())), + Err(_would_block) => continue, + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::pin::Pin::new(&mut &self.pt).poll_flush(cx) + std::task::Poll::Ready(Ok(())) } +} + +/// The child end of the pty +/// +/// See [`Pty::pts`] and [`Command::spawn`](crate::Command::spawn) +pub struct Pts(pub(crate) crate::sys::Pts); + +/// Borrowed read half of a [`Pty`] +pub struct ReadPty<'a>(&'a AsyncPty); - fn poll_close( +impl<'a> tokio::io::AsyncRead for ReadPty<'a> { + fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_read_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + let mut b = [0u8; 4096]; + match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) { + Ok(Ok(bytes)) => { + // XXX this is safe, but not particularly efficient + buf.clear(); + buf.initialize_unfilled_to(bytes); + buf.set_filled(bytes); + buf.filled_mut().copy_from_slice(&b[..bytes]); + return std::task::Poll::Ready(Ok(())); + } + Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), + Err(_would_block) => continue, + } + } + } +} + +/// Borrowed write half of a [`Pty`] +pub struct WritePty<'a>(&'a AsyncPty); + +impl<'a> WritePty<'a> { + /// Change the terminal size associated with the pty. + /// + /// # Errors + /// Returns an error if we were unable to set the terminal size. + pub fn resize(&self, size: crate::Size) -> crate::Result<()> { + self.0.get_ref().set_term_size(size) + } +} + +impl<'a> tokio::io::AsyncWrite for WritePty<'a> { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).flush()) { + Ok(_) => return std::task::Poll::Ready(Ok(())), + Err(_would_block) => continue, + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +/// Owned read half of a [`Pty`] +#[derive(Debug)] +pub struct OwnedReadPty(std::sync::Arc); + +impl OwnedReadPty { + /// Attempt to join the two halves of a `Pty` back into a single instance. + /// The two halves must have originated from calling + /// [`into_split`](Pty::into_split) on a single instance. + /// + /// # Errors + /// Returns an error if the two halves came from different [`Pty`] + /// instances. The mismatched halves are returned as part of the error. + pub fn unsplit(self, write_half: OwnedWritePty) -> crate::Result { + let Self(read_pt) = self; + let OwnedWritePty(write_pt) = write_half; + if std::sync::Arc::ptr_eq(&read_pt, &write_pt) { + drop(write_pt); + Ok(Pty(std::sync::Arc::try_unwrap(read_pt) + // it shouldn't be possible for more than two references to + // the same pty to exist + .unwrap_or_else(|_| unreachable!()))) + } else { + Err(crate::Error::Unsplit( + Self(read_pt), + OwnedWritePty(write_pt), + )) + } + } +} + +impl tokio::io::AsyncRead for OwnedReadPty { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_read_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + let mut b = [0u8; 4096]; + match guard.try_io(|inner| (&inner.get_ref().0).read(&mut b)) { + Ok(Ok(bytes)) => { + // XXX this is safe, but not particularly efficient + buf.clear(); + buf.initialize_unfilled_to(bytes); + buf.set_filled(bytes); + buf.filled_mut().copy_from_slice(&b[..bytes]); + return std::task::Poll::Ready(Ok(())); + } + Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), + Err(_would_block) => continue, + } + } + } +} + +/// Owned write half of a [`Pty`] +#[derive(Debug)] +pub struct OwnedWritePty(std::sync::Arc); + +impl OwnedWritePty { + /// Change the terminal size associated with the pty. + /// + /// # Errors + /// Returns an error if we were unable to set the terminal size. + pub fn resize(&self, size: crate::Size) -> crate::Result<()> { + self.0.get_ref().set_term_size(size) + } +} + +impl tokio::io::AsyncWrite for OwnedWritePty { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).write(buf)) { + Ok(result) => return std::task::Poll::Ready(result), + Err(_would_block) => continue, + } + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + loop { + let mut guard = match self.0.poll_write_ready(cx) { + std::task::Poll::Ready(guard) => guard, + std::task::Poll::Pending => return std::task::Poll::Pending, + }?; + match guard.try_io(|inner| (&inner.get_ref().0).flush()) { + Ok(_) => return std::task::Poll::Ready(Ok(())), + Err(_would_block) => continue, + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::pin::Pin::new(&mut &self.pt).poll_close(cx) + std::task::Poll::Ready(Ok(())) } } -- cgit v1.2.3-54-g00ecf