From a30174620d6b64f838989a634c265a353b2ab117 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Wed, 5 Jan 2022 07:18:29 -0500 Subject: a bunch more reorganization --- src/shell/history/pty.rs | 109 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 src/shell/history/pty.rs (limited to 'src/shell/history/pty.rs') diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs new file mode 100644 index 0000000..602a568 --- /dev/null +++ b/src/shell/history/pty.rs @@ -0,0 +1,109 @@ +use crate::shell::prelude::*; + +use async_std::io::{ReadExt as _, WriteExt as _}; +use futures_lite::future::FutureExt as _; + +pub struct Pty { + pty: async_std::sync::Arc, + close_w: async_std::channel::Sender<()>, +} + +impl Pty { + pub fn new( + size: (u16, u16), + entry: &async_std::sync::Arc>, + input_r: async_std::channel::Receiver>, + resize_r: async_std::channel::Receiver<(u16, u16)>, + event_w: async_std::channel::Sender, + ) -> anyhow::Result { + let (close_w, close_r) = async_std::channel::unbounded(); + + let pty = pty_process::Pty::new()?; + pty.resize(pty_process::Size::new(size.0, size.1))?; + let pty = async_std::sync::Arc::new(pty); + + async_std::task::spawn(pty_task( + async_std::sync::Arc::clone(&pty), + async_std::sync::Arc::clone(entry), + input_r, + resize_r, + close_r, + event_w, + )); + + Ok(Self { pty, close_w }) + } + + pub fn spawn( + &self, + mut cmd: pty_process::Command, + ) -> anyhow::Result { + Ok(cmd.spawn(&self.pty)?) + } + + pub async fn close(&self) { + self.close_w.send(()).await.unwrap(); + } +} + +async fn pty_task( + pty: async_std::sync::Arc, + entry: async_std::sync::Arc>, + input_r: async_std::channel::Receiver>, + resize_r: async_std::channel::Receiver<(u16, u16)>, + close_r: async_std::channel::Receiver<()>, + event_w: async_std::channel::Sender, +) { + loop { + enum Res { + Read(Result), + Write(Result, async_std::channel::RecvError>), + Resize(Result<(u16, u16), async_std::channel::RecvError>), + Close(Result<(), async_std::channel::RecvError>), + } + let mut buf = [0_u8; 4096]; + let read = async { Res::Read((&*pty).read(&mut buf).await) }; + let write = async { Res::Write(input_r.recv().await) }; + let resize = async { Res::Resize(resize_r.recv().await) }; + let close = async { Res::Close(close_r.recv().await) }; + match read.race(write).race(resize).or(close).await { + Res::Read(res) => match res { + Ok(bytes) => { + entry.lock_arc().await.process(&buf[..bytes]); + event_w.send(Event::PtyOutput).await.unwrap(); + } + Err(e) => { + if e.raw_os_error() != Some(libc::EIO) { + panic!("pty read failed: {:?}", e); + } + } + }, + Res::Write(res) => match res { + Ok(bytes) => { + (&*pty).write(&bytes).await.unwrap(); + } + Err(e) => { + panic!("failed to read from input channel: {}", e); + } + }, + Res::Resize(res) => match res { + Ok(size) => { + pty.resize(pty_process::Size::new(size.0, size.1)) + .unwrap(); + } + Err(e) => { + panic!("failed to read from resize channel: {}", e); + } + }, + Res::Close(res) => match res { + Ok(()) => { + event_w.send(Event::PtyClose).await.unwrap(); + return; + } + Err(e) => { + panic!("failed to read from close channel: {}", e); + } + }, + } + } +} -- cgit v1.2.3-54-g00ecf