#![allow(clippy::module_name_repetitions)] use std::io::{Read as _, Write as _}; type AsyncPty = tokio::io::unix::AsyncFd; /// An allocated pty pub struct Pty(AsyncPty); impl Pty { /// Allocate and return a new pty. /// /// # Errors /// Returns an error if the pty failed to be allocated, or if we were /// unable to put it into non-blocking mode. pub fn new() -> crate::Result { let pty = crate::sys::Pty::open()?; pty.set_nonblocking()?; Ok(Self(tokio::io::unix::AsyncFd::new(pty)?)) } /// Change the terminal size associated with the pty. /// /// # Errors /// Returns an error if we were unable to set the terminal size. pub fn resize(&self, size: crate::Size) -> crate::Result<()> { self.0.get_ref().set_term_size(size) } /// Opens a file descriptor for the other end of the pty, which should be /// attached to the child process running in it. See /// [`Command::spawn`](crate::Command::spawn). /// /// # Errors /// Returns an error if the device node to open could not be determined, /// or if the device node could not be opened. pub fn pts(&self) -> crate::Result { Ok(Pts(self.0.get_ref().pts()?)) } /// Splits a `Pty` into a read half and a write half, which can be used to /// read from and write to the pty concurrently. Does not allocate, but /// the returned halves cannot be moved to independent tasks. pub fn split(&mut self) -> (ReadPty<'_>, WritePty<'_>) { (ReadPty(&self.0), WritePty(&self.0)) } /// Splits a `Pty` into a read half and a write half, which can be used to /// read from and write to the pty concurrently. This method requires an /// allocation, but the returned halves can be moved to independent tasks. /// The original `Pty` instance can be recovered via the /// [`OwnedReadPty::unsplit`] method. #[must_use] pub fn into_split(self) -> (OwnedReadPty, OwnedWritePty) { let Self(pt) = self; let read_pt = std::sync::Arc::new(pt); let write_pt = std::sync::Arc::clone(&read_pt); (OwnedReadPty(read_pt), OwnedWritePty(write_pt)) } } impl From for std::os::fd::OwnedFd { fn from(pty: Pty) -> Self { pty.0.into_inner().into() } } impl std::os::fd::AsFd for Pty { fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> { self.0.get_ref().as_fd() } } impl std::os::fd::AsRawFd for Pty { fn as_raw_fd(&self) -> std::os::fd::RawFd { self.0.get_ref().as_raw_fd() } } impl tokio::io::AsyncRead for Pty { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf, ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_read_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; // XXX should be able to optimize this once read_buf is stabilized // in std let b = buf.initialize_unfilled(); match guard.try_io(|inner| inner.get_ref().read(b)) { Ok(Ok(bytes)) => { buf.advance(bytes); return std::task::Poll::Ready(Ok(())); } Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), Err(_would_block) => continue, } } } } impl tokio::io::AsyncWrite for Pty { fn poll_write( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_write_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; match guard.try_io(|inner| inner.get_ref().write(buf)) { Ok(result) => return std::task::Poll::Ready(result), Err(_would_block) => continue, } } } fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_write_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; match guard.try_io(|inner| inner.get_ref().flush()) { Ok(_) => return std::task::Poll::Ready(Ok(())), Err(_would_block) => continue, } } } fn poll_shutdown( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } } /// The child end of the pty /// /// See [`Pty::pts`] and [`Command::spawn`](crate::Command::spawn) pub struct Pts(pub(crate) crate::sys::Pts); /// Borrowed read half of a [`Pty`] pub struct ReadPty<'a>(&'a AsyncPty); impl<'a> tokio::io::AsyncRead for ReadPty<'a> { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf, ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_read_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; // XXX should be able to optimize this once read_buf is stabilized // in std let b = buf.initialize_unfilled(); match guard.try_io(|inner| inner.get_ref().read(b)) { Ok(Ok(bytes)) => { buf.advance(bytes); return std::task::Poll::Ready(Ok(())); } Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), Err(_would_block) => continue, } } } } /// Borrowed write half of a [`Pty`] pub struct WritePty<'a>(&'a AsyncPty); impl<'a> WritePty<'a> { /// Change the terminal size associated with the pty. /// /// # Errors /// Returns an error if we were unable to set the terminal size. pub fn resize(&self, size: crate::Size) -> crate::Result<()> { self.0.get_ref().set_term_size(size) } } impl<'a> tokio::io::AsyncWrite for WritePty<'a> { fn poll_write( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_write_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; match guard.try_io(|inner| inner.get_ref().write(buf)) { Ok(result) => return std::task::Poll::Ready(result), Err(_would_block) => continue, } } } fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_write_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; match guard.try_io(|inner| inner.get_ref().flush()) { Ok(_) => return std::task::Poll::Ready(Ok(())), Err(_would_block) => continue, } } } fn poll_shutdown( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } } /// Owned read half of a [`Pty`] #[derive(Debug)] pub struct OwnedReadPty(std::sync::Arc); impl OwnedReadPty { /// Attempt to join the two halves of a `Pty` back into a single instance. /// The two halves must have originated from calling /// [`into_split`](Pty::into_split) on a single instance. /// /// # Errors /// Returns an error if the two halves came from different [`Pty`] /// instances. The mismatched halves are returned as part of the error. pub fn unsplit(self, write_half: OwnedWritePty) -> crate::Result { let Self(read_pt) = self; let OwnedWritePty(write_pt) = write_half; if std::sync::Arc::ptr_eq(&read_pt, &write_pt) { drop(write_pt); Ok(Pty(std::sync::Arc::try_unwrap(read_pt) // it shouldn't be possible for more than two references to // the same pty to exist .unwrap_or_else(|_| unreachable!()))) } else { Err(crate::Error::Unsplit( Self(read_pt), OwnedWritePty(write_pt), )) } } } impl tokio::io::AsyncRead for OwnedReadPty { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf, ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_read_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; // XXX should be able to optimize this once read_buf is stabilized // in std let b = buf.initialize_unfilled(); match guard.try_io(|inner| inner.get_ref().read(b)) { Ok(Ok(bytes)) => { buf.advance(bytes); return std::task::Poll::Ready(Ok(())); } Ok(Err(e)) => return std::task::Poll::Ready(Err(e)), Err(_would_block) => continue, } } } } /// Owned write half of a [`Pty`] #[derive(Debug)] pub struct OwnedWritePty(std::sync::Arc); impl OwnedWritePty { /// Change the terminal size associated with the pty. /// /// # Errors /// Returns an error if we were unable to set the terminal size. pub fn resize(&self, size: crate::Size) -> crate::Result<()> { self.0.get_ref().set_term_size(size) } } impl tokio::io::AsyncWrite for OwnedWritePty { fn poll_write( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_write_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; match guard.try_io(|inner| inner.get_ref().write(buf)) { Ok(result) => return std::task::Poll::Ready(result), Err(_would_block) => continue, } } } fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { loop { let mut guard = match self.0.poll_write_ready(cx) { std::task::Poll::Ready(guard) => guard, std::task::Poll::Pending => return std::task::Poll::Pending, }?; match guard.try_io(|inner| inner.get_ref().flush()) { Ok(_) => return std::task::Poll::Ready(Ok(())), Err(_would_block) => continue, } } } fn poll_shutdown( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } }