summaryrefslogtreecommitdiffstats
path: root/src/state/history/mod.rs
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-01-02 19:12:22 -0500
committerJesse Luehrs <doy@tozt.net>2022-01-02 19:12:22 -0500
commit758cc746825770eff16bb7f6317297efd3cc0f27 (patch)
treec63dde130c8b238abaf664435051df6333cf3027 /src/state/history/mod.rs
parent742013492997e882ef86fdf2ba083f15c7fdea88 (diff)
downloadnbsh-758cc746825770eff16bb7f6317297efd3cc0f27.tar.gz
nbsh-758cc746825770eff16bb7f6317297efd3cc0f27.zip
make all commands go through the pipe runner
this will be necessary to support sigtstp, etc (since session leaders do not suspend when they receive sigtstp). builtins are temporarily broken, their support needs to be rewritten
Diffstat (limited to 'src/state/history/mod.rs')
-rw-r--r--src/state/history/mod.rs265
1 files changed, 43 insertions, 222 deletions
diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs
index 1912964..6263a1f 100644
--- a/src/state/history/mod.rs
+++ b/src/state/history/mod.rs
@@ -1,9 +1,9 @@
-use async_std::io::{ReadExt as _, WriteExt as _};
-use futures_lite::future::FutureExt as _;
+use async_std::io::WriteExt as _;
use std::os::unix::io::FromRawFd as _;
use std::os::unix::process::ExitStatusExt as _;
mod builtins;
+mod pty;
pub struct History {
size: (u16, u16),
@@ -93,20 +93,21 @@ impl History {
) -> anyhow::Result<usize> {
let (input_w, input_r) = async_std::channel::unbounded();
let (resize_w, resize_r) = async_std::channel::unbounded();
+ let (close_w, close_r) = async_std::channel::unbounded();
let entry = async_std::sync::Arc::new(async_std::sync::Mutex::new(
Entry::new(Ok(ast.clone()), self.size, input_w, resize_w),
));
+ let pty = pty::Pty::new(
+ self.size, &entry, input_r, resize_r, close_r, event_w,
+ )?;
run_commands(
ast.clone(),
- ProcessEnv::new(
- async_std::sync::Arc::clone(&entry),
- self.size,
- input_r,
- resize_r,
- event_w,
- ),
+ pty,
+ async_std::sync::Arc::clone(&entry),
+ ProcessEnv::new(),
+ close_w,
);
self.entries.push(entry);
@@ -545,64 +546,16 @@ impl ExitInfo {
#[derive(Clone)]
pub struct ProcessEnv {
- entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
- pty: async_std::sync::Arc<pty_process::Pty>,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<crate::event::Event>,
-
latest_status: async_std::process::ExitStatus,
}
impl ProcessEnv {
- fn new(
- entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
- size: (u16, u16),
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<crate::event::Event>,
- ) -> Self {
- let pty = pty_process::Pty::new().unwrap();
- pty.resize(pty_process::Size::new(size.0, size.1)).unwrap();
+ fn new() -> Self {
Self {
- entry,
- pty: async_std::sync::Arc::new(pty),
- input_r,
- resize_r,
- event_w,
-
latest_status: async_std::process::ExitStatus::from_raw(0),
}
}
- async fn entry(&self) -> async_std::sync::MutexGuardArc<Entry> {
- self.entry.lock_arc().await
- }
-
- async fn read_pty(&self, buf: &mut [u8]) -> std::io::Result<usize> {
- (&*self.pty).read(buf).await
- }
-
- async fn write_pty(&self, buf: &[u8]) -> std::io::Result<usize> {
- (&*self.pty).write(buf).await
- }
-
- async fn read_input(
- &self,
- ) -> Result<Vec<u8>, async_std::channel::RecvError> {
- self.input_r.recv().await
- }
-
- async fn read_resize(
- &self,
- ) -> Result<(u16, u16), async_std::channel::RecvError> {
- self.resize_r.recv().await
- }
-
- async fn send_event(&self, event: crate::event::Event) {
- self.event_w.send(event).await.unwrap();
- }
-
fn set_status(&mut self, status: async_std::process::ExitStatus) {
self.latest_status = status;
}
@@ -612,53 +565,53 @@ impl ProcessEnv {
}
}
-fn run_commands(ast: crate::parse::Commands, mut env: ProcessEnv) {
+fn run_commands(
+ ast: crate::parse::Commands,
+ pty: pty::Pty,
+ entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
+ mut env: ProcessEnv,
+ close_w: async_std::channel::Sender<()>,
+) {
async_std::task::spawn(async move {
for pipeline in ast.pipelines() {
- let (pipeline_status, done) = run_pipeline(pipeline, &env).await;
+ let (pipeline_status, done) =
+ run_pipeline(pipeline, &pty, &env).await;
env.set_status(pipeline_status);
if done {
break;
}
}
- env.entry().await.exit_info =
+ entry.lock_arc().await.exit_info =
Some(ExitInfo::new(*env.latest_status()));
- env.send_event(crate::event::Event::ProcessExit).await;
+ close_w.send(()).await.unwrap();
});
}
async fn run_pipeline(
pipeline: &crate::parse::Pipeline,
+ pty: &pty::Pty,
env: &ProcessEnv,
) -> (async_std::process::ExitStatus, bool) {
- let status = if pipeline.exes().len() == 1 {
- run_exe(&pipeline.exes()[0], env).await
- } else {
- let mut cmd =
- pty_process::Command::new(std::env::current_exe().unwrap());
- cmd.arg("--internal-pipe-runner");
- let (r, w) =
- nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
- unsafe {
- cmd.pre_exec(move || {
- nix::unistd::dup2(r, 3)?;
- Ok(())
- });
- }
- let child = cmd.spawn(&env.pty).unwrap();
- nix::unistd::close(r).unwrap();
-
- let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
- let pipeline = pipeline.clone();
- let fut = async move {
- w.write_all(pipeline.input_string().as_bytes())
- .await
- .unwrap();
- drop(w);
- child.status_no_drop().await.unwrap()
- };
- run_future(fut, env.clone()).await
- };
+ let mut cmd = pty_process::Command::new(std::env::current_exe().unwrap());
+ cmd.arg("--internal-pipe-runner");
+ let (r, w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
+ unsafe {
+ cmd.pre_exec(move || {
+ nix::unistd::dup2(r, 3)?;
+ Ok(())
+ });
+ }
+ let child = pty.spawn(cmd).unwrap();
+ nix::unistd::close(r).unwrap();
+
+ let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
+ w.write_all(pipeline.input_string().as_bytes())
+ .await
+ .unwrap();
+ // todo: write contents of env also
+ drop(w);
+
+ let status = child.status_no_drop().await.unwrap();
(
status,
// i'm not sure what exactly the expected behavior here is - in zsh,
@@ -669,138 +622,6 @@ async fn run_pipeline(
)
}
-async fn run_exe(
- exe: &crate::parse::Exe,
- env: &ProcessEnv,
-) -> async_std::process::ExitStatus {
- if let Some(fut) = builtins::run(exe, env) {
- run_future(fut, env.clone()).await
- } else {
- let fut = {
- let exe = exe.clone();
- let env = env.clone();
- async move { run_binary(&exe, &env).await }
- };
- run_future(fut, env.clone()).await
- }
-}
-
-async fn run_future<F>(
- fut: F,
- env: ProcessEnv,
-) -> async_std::process::ExitStatus
-where
- F: std::future::Future<Output = async_std::process::ExitStatus>
- + Sync
- + Send
- + 'static,
-{
- let (exit_w, exit_r) = async_std::channel::unbounded();
- async_std::task::spawn(async move {
- let status = fut.await;
- exit_w.send(status).await.unwrap();
- });
- loop {
- enum Res {
- Read(Result<usize, std::io::Error>),
- Write(Result<Vec<u8>, async_std::channel::RecvError>),
- Resize(Result<(u16, u16), async_std::channel::RecvError>),
- Exit(
- Result<
- async_std::process::ExitStatus,
- async_std::channel::RecvError,
- >,
- ),
- }
- let mut buf = [0_u8; 4096];
- let read = async { Res::Read(env.read_pty(&mut buf).await) };
- let write = async { Res::Write(env.read_input().await) };
- let resize = async { Res::Resize(env.read_resize().await) };
- let exit = async { Res::Exit(exit_r.recv().await) };
- match read.race(write).race(resize).or(exit).await {
- Res::Read(res) => match res {
- Ok(bytes) => {
- let mut entry = env.entry().await;
- let pre_alternate_screen =
- entry.vt.screen().alternate_screen();
- entry.vt.process(&buf[..bytes]);
- let post_alternate_screen =
- entry.vt.screen().alternate_screen();
- if entry.fullscreen.is_none()
- && pre_alternate_screen != post_alternate_screen
- {
- env.send_event(
- crate::event::Event::ProcessAlternateScreen,
- )
- .await;
- }
- env.send_event(crate::event::Event::ProcessOutput).await;
- }
- Err(e) => {
- if e.raw_os_error() != Some(libc::EIO) {
- panic!("pty read failed: {:?}", e);
- }
- }
- },
- Res::Write(res) => match res {
- Ok(bytes) => {
- env.write_pty(&bytes).await.unwrap();
- }
- Err(e) => {
- panic!("failed to read from input channel: {}", e);
- }
- },
- Res::Resize(res) => match res {
- Ok(size) => {
- env.pty
- .resize(pty_process::Size::new(size.0, size.1))
- .unwrap();
- env.entry().await.vt.set_size(size.0, size.1);
- }
- Err(e) => {
- panic!("failed to read from resize channel: {}", e);
- }
- },
- Res::Exit(res) => match res {
- Ok(status) => {
- return status;
- }
- Err(e) => {
- panic!("failed to get exit status: {}", e);
- }
- },
- }
- }
-}
-
-async fn run_binary(
- exe: &crate::parse::Exe,
- env: &ProcessEnv,
-) -> async_std::process::ExitStatus {
- let mut process = pty_process::Command::new(exe.exe());
- process.args(exe.args());
- let child = process.spawn(&env.pty);
- let child = match child {
- Ok(child) => child,
- Err(e) => {
- let exe = exe.clone();
- let env = env.clone();
- let e_str = if let pty_process::Error::Io(e) = e {
- crate::format::io_error(&e)
- } else {
- e.to_string()
- };
- env.write_pty(
- format!("nbsh: {}: {}\n", e_str, exe.exe()).as_bytes(),
- )
- .await
- .unwrap();
- return async_std::process::ExitStatus::from_raw(1 << 8);
- }
- };
- child.status_no_drop().await.unwrap()
-}
-
fn set_bgcolor(out: &mut impl textmode::Textmode, idx: usize, focus: bool) {
if focus {
out.set_bgcolor(textmode::Color::Rgb(0x56, 0x1b, 0x8b));