diff options
-rw-r--r-- | src/state/history/builtins.rs | 35 | ||||
-rw-r--r-- | src/state/history/mod.rs | 265 | ||||
-rw-r--r-- | src/state/history/pty.rs | 128 |
3 files changed, 192 insertions, 236 deletions
diff --git a/src/state/history/builtins.rs b/src/state/history/builtins.rs index 49043cd..329aa97 100644 --- a/src/state/history/builtins.rs +++ b/src/state/history/builtins.rs @@ -1,3 +1,4 @@ +use async_std::io::WriteExt as _; use std::os::unix::process::ExitStatusExt as _; type Builtin = &'static (dyn for<'a> Fn( @@ -99,7 +100,10 @@ async fn cd( home().join(path.strip_prefix(prefix).unwrap()) } else { // TODO - env.write_pty(b"unimplemented\n").await.unwrap(); + async_std::io::stderr() + .write(b"unimplemented\n") + .await + .unwrap(); return async_std::process::ExitStatus::from_raw(1 << 8); } } else { @@ -111,17 +115,18 @@ async fn cd( let code = match std::env::set_current_dir(&dir) { Ok(()) => 0, Err(e) => { - env.write_pty( - format!( - "{}: {}: {}\n", - exe.exe(), - crate::format::io_error(&e), - dir.display() + async_std::io::stderr() + .write( + format!( + "{}: {}: {}\n", + exe.exe(), + crate::format::io_error(&e), + dir.display() + ) + .as_bytes(), ) - .as_bytes(), - ) - .await - .unwrap(); + .await + .unwrap(); 1 } }; @@ -134,7 +139,8 @@ async fn and( ) -> async_std::process::ExitStatus { let exe = exe.shift(); if env.latest_status().success() { - super::run_exe(&exe, env).await + todo!() + // super::run_exe(&exe, env).await } else { *env.latest_status() } @@ -148,7 +154,8 @@ async fn or( if env.latest_status().success() { *env.latest_status() } else { - super::run_exe(&exe, env).await + todo!() + // super::run_exe(&exe, env).await } } @@ -157,7 +164,7 @@ async fn command( env: &super::ProcessEnv, ) -> async_std::process::ExitStatus { let exe = exe.shift(); - super::run_binary(&exe, env).await; + // super::run_binary(&exe, env).await; *env.latest_status() } diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs index 1912964..6263a1f 100644 --- a/src/state/history/mod.rs +++ b/src/state/history/mod.rs @@ -1,9 +1,9 @@ -use async_std::io::{ReadExt as _, WriteExt as _}; -use futures_lite::future::FutureExt as _; +use async_std::io::WriteExt as _; use std::os::unix::io::FromRawFd as _; use std::os::unix::process::ExitStatusExt as _; mod builtins; +mod pty; pub struct History { size: (u16, u16), @@ -93,20 +93,21 @@ impl History { ) -> anyhow::Result<usize> { let (input_w, input_r) = async_std::channel::unbounded(); let (resize_w, resize_r) = async_std::channel::unbounded(); + let (close_w, close_r) = async_std::channel::unbounded(); let entry = async_std::sync::Arc::new(async_std::sync::Mutex::new( Entry::new(Ok(ast.clone()), self.size, input_w, resize_w), )); + let pty = pty::Pty::new( + self.size, &entry, input_r, resize_r, close_r, event_w, + )?; run_commands( ast.clone(), - ProcessEnv::new( - async_std::sync::Arc::clone(&entry), - self.size, - input_r, - resize_r, - event_w, - ), + pty, + async_std::sync::Arc::clone(&entry), + ProcessEnv::new(), + close_w, ); self.entries.push(entry); @@ -545,64 +546,16 @@ impl ExitInfo { #[derive(Clone)] pub struct ProcessEnv { - entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>, - pty: async_std::sync::Arc<pty_process::Pty>, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<crate::event::Event>, - latest_status: async_std::process::ExitStatus, } impl ProcessEnv { - fn new( - entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>, - size: (u16, u16), - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<crate::event::Event>, - ) -> Self { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(size.0, size.1)).unwrap(); + fn new() -> Self { Self { - entry, - pty: async_std::sync::Arc::new(pty), - input_r, - resize_r, - event_w, - latest_status: async_std::process::ExitStatus::from_raw(0), } } - async fn entry(&self) -> async_std::sync::MutexGuardArc<Entry> { - self.entry.lock_arc().await - } - - async fn read_pty(&self, buf: &mut [u8]) -> std::io::Result<usize> { - (&*self.pty).read(buf).await - } - - async fn write_pty(&self, buf: &[u8]) -> std::io::Result<usize> { - (&*self.pty).write(buf).await - } - - async fn read_input( - &self, - ) -> Result<Vec<u8>, async_std::channel::RecvError> { - self.input_r.recv().await - } - - async fn read_resize( - &self, - ) -> Result<(u16, u16), async_std::channel::RecvError> { - self.resize_r.recv().await - } - - async fn send_event(&self, event: crate::event::Event) { - self.event_w.send(event).await.unwrap(); - } - fn set_status(&mut self, status: async_std::process::ExitStatus) { self.latest_status = status; } @@ -612,53 +565,53 @@ impl ProcessEnv { } } -fn run_commands(ast: crate::parse::Commands, mut env: ProcessEnv) { +fn run_commands( + ast: crate::parse::Commands, + pty: pty::Pty, + entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>, + mut env: ProcessEnv, + close_w: async_std::channel::Sender<()>, +) { async_std::task::spawn(async move { for pipeline in ast.pipelines() { - let (pipeline_status, done) = run_pipeline(pipeline, &env).await; + let (pipeline_status, done) = + run_pipeline(pipeline, &pty, &env).await; env.set_status(pipeline_status); if done { break; } } - env.entry().await.exit_info = + entry.lock_arc().await.exit_info = Some(ExitInfo::new(*env.latest_status())); - env.send_event(crate::event::Event::ProcessExit).await; + close_w.send(()).await.unwrap(); }); } async fn run_pipeline( pipeline: &crate::parse::Pipeline, + pty: &pty::Pty, env: &ProcessEnv, ) -> (async_std::process::ExitStatus, bool) { - let status = if pipeline.exes().len() == 1 { - run_exe(&pipeline.exes()[0], env).await - } else { - let mut cmd = - pty_process::Command::new(std::env::current_exe().unwrap()); - cmd.arg("--internal-pipe-runner"); - let (r, w) = - nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); - unsafe { - cmd.pre_exec(move || { - nix::unistd::dup2(r, 3)?; - Ok(()) - }); - } - let child = cmd.spawn(&env.pty).unwrap(); - nix::unistd::close(r).unwrap(); - - let mut w = unsafe { async_std::fs::File::from_raw_fd(w) }; - let pipeline = pipeline.clone(); - let fut = async move { - w.write_all(pipeline.input_string().as_bytes()) - .await - .unwrap(); - drop(w); - child.status_no_drop().await.unwrap() - }; - run_future(fut, env.clone()).await - }; + let mut cmd = pty_process::Command::new(std::env::current_exe().unwrap()); + cmd.arg("--internal-pipe-runner"); + let (r, w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); + unsafe { + cmd.pre_exec(move || { + nix::unistd::dup2(r, 3)?; + Ok(()) + }); + } + let child = pty.spawn(cmd).unwrap(); + nix::unistd::close(r).unwrap(); + + let mut w = unsafe { async_std::fs::File::from_raw_fd(w) }; + w.write_all(pipeline.input_string().as_bytes()) + .await + .unwrap(); + // todo: write contents of env also + drop(w); + + let status = child.status_no_drop().await.unwrap(); ( status, // i'm not sure what exactly the expected behavior here is - in zsh, @@ -669,138 +622,6 @@ async fn run_pipeline( ) } -async fn run_exe( - exe: &crate::parse::Exe, - env: &ProcessEnv, -) -> async_std::process::ExitStatus { - if let Some(fut) = builtins::run(exe, env) { - run_future(fut, env.clone()).await - } else { - let fut = { - let exe = exe.clone(); - let env = env.clone(); - async move { run_binary(&exe, &env).await } - }; - run_future(fut, env.clone()).await - } -} - -async fn run_future<F>( - fut: F, - env: ProcessEnv, -) -> async_std::process::ExitStatus -where - F: std::future::Future<Output = async_std::process::ExitStatus> - + Sync - + Send - + 'static, -{ - let (exit_w, exit_r) = async_std::channel::unbounded(); - async_std::task::spawn(async move { - let status = fut.await; - exit_w.send(status).await.unwrap(); - }); - loop { - enum Res { - Read(Result<usize, std::io::Error>), - Write(Result<Vec<u8>, async_std::channel::RecvError>), - Resize(Result<(u16, u16), async_std::channel::RecvError>), - Exit( - Result< - async_std::process::ExitStatus, - async_std::channel::RecvError, - >, - ), - } - let mut buf = [0_u8; 4096]; - let read = async { Res::Read(env.read_pty(&mut buf).await) }; - let write = async { Res::Write(env.read_input().await) }; - let resize = async { Res::Resize(env.read_resize().await) }; - let exit = async { Res::Exit(exit_r.recv().await) }; - match read.race(write).race(resize).or(exit).await { - Res::Read(res) => match res { - Ok(bytes) => { - let mut entry = env.entry().await; - let pre_alternate_screen = - entry.vt.screen().alternate_screen(); - entry.vt.process(&buf[..bytes]); - let post_alternate_screen = - entry.vt.screen().alternate_screen(); - if entry.fullscreen.is_none() - && pre_alternate_screen != post_alternate_screen - { - env.send_event( - crate::event::Event::ProcessAlternateScreen, - ) - .await; - } - env.send_event(crate::event::Event::ProcessOutput).await; - } - Err(e) => { - if e.raw_os_error() != Some(libc::EIO) { - panic!("pty read failed: {:?}", e); - } - } - }, - Res::Write(res) => match res { - Ok(bytes) => { - env.write_pty(&bytes).await.unwrap(); - } - Err(e) => { - panic!("failed to read from input channel: {}", e); - } - }, - Res::Resize(res) => match res { - Ok(size) => { - env.pty - .resize(pty_process::Size::new(size.0, size.1)) - .unwrap(); - env.entry().await.vt.set_size(size.0, size.1); - } - Err(e) => { - panic!("failed to read from resize channel: {}", e); - } - }, - Res::Exit(res) => match res { - Ok(status) => { - return status; - } - Err(e) => { - panic!("failed to get exit status: {}", e); - } - }, - } - } -} - -async fn run_binary( - exe: &crate::parse::Exe, - env: &ProcessEnv, -) -> async_std::process::ExitStatus { - let mut process = pty_process::Command::new(exe.exe()); - process.args(exe.args()); - let child = process.spawn(&env.pty); - let child = match child { - Ok(child) => child, - Err(e) => { - let exe = exe.clone(); - let env = env.clone(); - let e_str = if let pty_process::Error::Io(e) = e { - crate::format::io_error(&e) - } else { - e.to_string() - }; - env.write_pty( - format!("nbsh: {}: {}\n", e_str, exe.exe()).as_bytes(), - ) - .await - .unwrap(); - return async_std::process::ExitStatus::from_raw(1 << 8); - } - }; - child.status_no_drop().await.unwrap() -} - fn set_bgcolor(out: &mut impl textmode::Textmode, idx: usize, focus: bool) { if focus { out.set_bgcolor(textmode::Color::Rgb(0x56, 0x1b, 0x8b)); diff --git a/src/state/history/pty.rs b/src/state/history/pty.rs new file mode 100644 index 0000000..259b280 --- /dev/null +++ b/src/state/history/pty.rs @@ -0,0 +1,128 @@ +use async_std::io::{ReadExt as _, WriteExt as _}; +use futures_lite::future::FutureExt as _; + +pub struct Pty { + pty: async_std::sync::Arc<pty_process::Pty>, +} + +impl Pty { + pub fn new( + size: (u16, u16), + entry: &async_std::sync::Arc<async_std::sync::Mutex<super::Entry>>, + input_r: async_std::channel::Receiver<Vec<u8>>, + resize_r: async_std::channel::Receiver<(u16, u16)>, + close_r: async_std::channel::Receiver<()>, + event_w: async_std::channel::Sender<crate::event::Event>, + ) -> anyhow::Result<Self> { + let pty = pty_process::Pty::new()?; + pty.resize(pty_process::Size::new(size.0, size.1))?; + let pty = async_std::sync::Arc::new(pty); + + { + let entry = async_std::sync::Arc::clone(entry); + let pty = async_std::sync::Arc::clone(&pty); + async_std::task::spawn(async move { + loop { + enum Res { + Read(Result<usize, std::io::Error>), + Write(Result<Vec<u8>, async_std::channel::RecvError>), + Resize( + Result<(u16, u16), async_std::channel::RecvError>, + ), + Close(Result<(), async_std::channel::RecvError>), + } + let mut buf = [0_u8; 4096]; + let read = + async { Res::Read((&*pty).read(&mut buf).await) }; + let write = async { Res::Write(input_r.recv().await) }; + let resize = async { Res::Resize(resize_r.recv().await) }; + let close = async { Res::Close(close_r.recv().await) }; + match read.race(write).race(resize).or(close).await { + Res::Read(res) => match res { + Ok(bytes) => { + let mut entry = entry.lock_arc().await; + let pre_alternate_screen = + entry.vt.screen().alternate_screen(); + entry.vt.process(&buf[..bytes]); + let post_alternate_screen = + entry.vt.screen().alternate_screen(); + if entry.fullscreen.is_none() + && pre_alternate_screen + != post_alternate_screen + { + event_w.send( + crate::event::Event::ProcessAlternateScreen, + ) + .await + .unwrap(); + } + event_w + .send(crate::event::Event::ProcessOutput) + .await + .unwrap(); + } + Err(e) => { + if e.raw_os_error() != Some(libc::EIO) { + panic!("pty read failed: {:?}", e); + } + } + }, + Res::Write(res) => { + match res { + Ok(bytes) => { + (&*pty).write(&bytes).await.unwrap(); + } + Err(e) => { + panic!("failed to read from input channel: {}", e); + } + } + } + Res::Resize(res) => match res { + Ok(size) => { + pty.resize(pty_process::Size::new( + size.0, size.1, + )) + .unwrap(); + entry + .lock_arc() + .await + .vt + .set_size(size.0, size.1); + } + Err(e) => { + panic!( + "failed to read from resize channel: {}", + e + ); + } + }, + Res::Close(res) => match res { + Ok(()) => { + event_w + .send(crate::event::Event::ProcessExit) + .await + .unwrap(); + return; + } + Err(e) => { + panic!( + "failed to read from close channel: {}", + e + ); + } + }, + } + } + }); + } + + Ok(Self { pty }) + } + + pub fn spawn( + &self, + mut cmd: pty_process::Command, + ) -> anyhow::Result<async_std::process::Child> { + Ok(cmd.spawn(&self.pty)?) + } +} |