From fde3881053377bd63d7ca08d0604e6ca87860b9c Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Wed, 10 Nov 2021 14:06:31 -0500 Subject: wire up sending input to active process --- src/history.rs | 76 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 20 deletions(-) (limited to 'src/history.rs') diff --git a/src/history.rs b/src/history.rs index 128535b..3586a3c 100644 --- a/src/history.rs +++ b/src/history.rs @@ -1,4 +1,4 @@ -use async_std::io::ReadExt as _; +use async_std::io::{ReadExt as _, WriteExt as _}; use pty_process::Command as _; use textmode::Textmode as _; @@ -24,37 +24,61 @@ impl History { let child = process .spawn_pty(Some(&pty_process::Size::new(24, 80))) .unwrap(); + let (input_w, input_r) = async_std::channel::unbounded(); let entry = crate::util::mutex(HistoryEntry::new( cmd, child.id().try_into().unwrap(), + input_w, )); let task_entry = async_std::sync::Arc::clone(&entry); let task_action = self.action.clone(); async_std::task::spawn(async move { loop { let mut buf = [0_u8; 4096]; - match child.pty().read(&mut buf).await { - Ok(bytes) => { - task_entry.lock_arc().await.vt.process(&buf[..bytes]); - } - Err(e) => { - if e.raw_os_error() != Some(libc::EIO) { - eprintln!("pty read failed: {:?}", e); + let mut pty = child.pty(); + let read = pty.read(&mut buf); + let write = input_r.recv(); + match futures::future::select(read, write).await { + futures::future::Either::Left((res, _)) => { + match res { + Ok(bytes) => { + task_entry + .lock_arc() + .await + .vt + .process(&buf[..bytes]); + } + Err(e) => { + if e.raw_os_error() != Some(libc::EIO) { + eprintln!("pty read failed: {:?}", e); + } + task_entry.lock_arc().await.running = false; + task_action + .send(crate::state::Action::UpdateFocus( + crate::state::Focus::Readline, + )) + .await + .unwrap(); + break; + } } - task_entry.lock_arc().await.running = false; task_action - .send(crate::state::Action::UpdateFocus( - crate::state::Focus::Readline, - )) + .send(crate::state::Action::Render) .await .unwrap(); - break; } + futures::future::Either::Right((res, _)) => match res { + Ok(bytes) => { + pty.write(&bytes).await.unwrap(); + } + Err(e) => { + panic!( + "failed to read from input channel: {}", + e + ); + } + }, } - task_action - .send(crate::state::Action::Render) - .await - .unwrap(); } }); self.entries.push(entry); @@ -86,7 +110,6 @@ impl History { .await .unwrap(); } - textmode::Key::Ctrl(_) => {} key => { self.send_process_input(idx, &key.into_bytes()) .await @@ -159,7 +182,14 @@ impl History { idx: usize, input: &[u8], ) -> anyhow::Result<()> { - todo!() + self.entries[idx] + .lock_arc() + .await + .input + .send(input.to_vec()) + .await + .unwrap(); + Ok(()) } } @@ -167,16 +197,22 @@ struct HistoryEntry { cmd: String, pid: nix::unistd::Pid, vt: vt100::Parser, + input: async_std::channel::Sender>, running: bool, // option end time // start time } impl HistoryEntry { - fn new(cmd: &str, pid: i32) -> Self { + fn new( + cmd: &str, + pid: i32, + input: async_std::channel::Sender>, + ) -> Self { Self { cmd: cmd.into(), pid: nix::unistd::Pid::from_raw(pid), vt: vt100::Parser::new(24, 80, 0), + input, running: true, } } -- cgit v1.2.3-54-g00ecf