diff options
Diffstat (limited to 'src/shell/history/pty.rs')
-rw-r--r-- | src/shell/history/pty.rs | 118 |
1 files changed, 56 insertions, 62 deletions
diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 5a51e73..acfe500 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -1,26 +1,26 @@ use crate::shell::prelude::*; pub struct Pty { - pty: async_std::sync::Arc<pty_process::Pty>, - close_w: async_std::channel::Sender<()>, + pts: pty_process::Pts, + close_w: tokio::sync::mpsc::UnboundedSender<()>, } impl Pty { pub fn new( size: (u16, u16), entry: &crate::mutex::Mutex<super::Entry>, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<Event>, + input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) -> anyhow::Result<Self> { - let (close_w, close_r) = async_std::channel::unbounded(); + let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel(); let pty = pty_process::Pty::new()?; pty.resize(pty_process::Size::new(size.0, size.1))?; - let pty = async_std::sync::Arc::new(pty); + let pts = pty.pts()?; - async_std::task::spawn(pty_task( - async_std::sync::Arc::clone(&pty), + tokio::task::spawn(pty_task( + pty, crate::mutex::clone(entry), input_r, resize_r, @@ -28,80 +28,74 @@ impl Pty { event_w, )); - Ok(Self { pty, close_w }) + Ok(Self { pts, close_w }) } pub fn spawn( &self, mut cmd: pty_process::Command, - ) -> anyhow::Result<async_std::process::Child> { - Ok(cmd.spawn(&self.pty)?) + ) -> anyhow::Result<tokio::process::Child> { + Ok(cmd.spawn(&self.pts)?) } pub async fn close(&self) { - self.close_w.send(()).await.unwrap(); + self.close_w.send(()).unwrap(); } } async fn pty_task( - pty: async_std::sync::Arc<pty_process::Pty>, + pty: pty_process::Pty, entry: crate::mutex::Mutex<super::Entry>, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - close_r: async_std::channel::Receiver<()>, - event_w: async_std::channel::Sender<Event>, + input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + close_r: tokio::sync::mpsc::UnboundedReceiver<()>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) { - loop { - enum Res { - Read(Result<usize, std::io::Error>), - Write(Result<Vec<u8>, async_std::channel::RecvError>), - Resize(Result<(u16, u16), async_std::channel::RecvError>), - Close(Result<(), async_std::channel::RecvError>), - } - let mut buf = [0_u8; 4096]; - let read = async { Res::Read((&*pty).read(&mut buf).await) }; - let write = async { Res::Write(input_r.recv().await) }; - let resize = async { Res::Resize(resize_r.recv().await) }; - let close = async { Res::Close(close_r.recv().await) }; - match read.race(write).race(resize).or(close).await { + enum Res { + Read(Result<bytes::Bytes, std::io::Error>), + Write(Vec<u8>), + Resize((u16, u16)), + Close(()), + } + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(input_r) + .map(Res::Write) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r) + .map(Res::Resize) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(close_r) + .map(Res::Close) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { Res::Read(res) => match res { Ok(bytes) => { - entry.lock_arc().await.process(&buf[..bytes]); - event_w.send(Event::PtyOutput).await.unwrap(); + entry.clone().lock_owned().await.process(&bytes); + event_w.send(Event::PtyOutput).unwrap(); } Err(e) => { - if e.raw_os_error() == Some(libc::EIO) { - continue; - } panic!("pty read failed: {:?}", e); } }, - Res::Write(res) => match res { - Ok(bytes) => { - (&*pty).write(&bytes).await.unwrap(); - } - Err(e) => { - panic!("failed to read from input channel: {}", e); - } - }, - Res::Resize(res) => match res { - Ok(size) => { - pty.resize(pty_process::Size::new(size.0, size.1)) - .unwrap(); - } - Err(e) => { - panic!("failed to read from resize channel: {}", e); - } - }, - Res::Close(res) => match res { - Ok(()) => { - event_w.send(Event::PtyClose).await.unwrap(); - return; - } - Err(e) => { - panic!("failed to read from close channel: {}", e); - } - }, + Res::Write(bytes) => { + pty_w.write(&bytes).await.unwrap(); + } + Res::Resize(size) => pty_w + .resize(pty_process::Size::new(size.0, size.1)) + .unwrap(), + Res::Close(()) => { + event_w.send(Event::PtyClose).unwrap(); + return; + } } } } |