summaryrefslogtreecommitdiffstats
path: root/src/state/history
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-01-02 19:12:22 -0500
committerJesse Luehrs <doy@tozt.net>2022-01-02 19:12:22 -0500
commit758cc746825770eff16bb7f6317297efd3cc0f27 (patch)
treec63dde130c8b238abaf664435051df6333cf3027 /src/state/history
parent742013492997e882ef86fdf2ba083f15c7fdea88 (diff)
downloadnbsh-758cc746825770eff16bb7f6317297efd3cc0f27.tar.gz
nbsh-758cc746825770eff16bb7f6317297efd3cc0f27.zip
make all commands go through the pipe runner
this will be necessary to support sigtstp, etc (since session leaders do not suspend when they receive sigtstp). builtins are temporarily broken, their support needs to be rewritten
Diffstat (limited to 'src/state/history')
-rw-r--r--src/state/history/builtins.rs35
-rw-r--r--src/state/history/mod.rs265
-rw-r--r--src/state/history/pty.rs128
3 files changed, 192 insertions, 236 deletions
diff --git a/src/state/history/builtins.rs b/src/state/history/builtins.rs
index 49043cd..329aa97 100644
--- a/src/state/history/builtins.rs
+++ b/src/state/history/builtins.rs
@@ -1,3 +1,4 @@
+use async_std::io::WriteExt as _;
use std::os::unix::process::ExitStatusExt as _;
type Builtin = &'static (dyn for<'a> Fn(
@@ -99,7 +100,10 @@ async fn cd(
home().join(path.strip_prefix(prefix).unwrap())
} else {
// TODO
- env.write_pty(b"unimplemented\n").await.unwrap();
+ async_std::io::stderr()
+ .write(b"unimplemented\n")
+ .await
+ .unwrap();
return async_std::process::ExitStatus::from_raw(1 << 8);
}
} else {
@@ -111,17 +115,18 @@ async fn cd(
let code = match std::env::set_current_dir(&dir) {
Ok(()) => 0,
Err(e) => {
- env.write_pty(
- format!(
- "{}: {}: {}\n",
- exe.exe(),
- crate::format::io_error(&e),
- dir.display()
+ async_std::io::stderr()
+ .write(
+ format!(
+ "{}: {}: {}\n",
+ exe.exe(),
+ crate::format::io_error(&e),
+ dir.display()
+ )
+ .as_bytes(),
)
- .as_bytes(),
- )
- .await
- .unwrap();
+ .await
+ .unwrap();
1
}
};
@@ -134,7 +139,8 @@ async fn and(
) -> async_std::process::ExitStatus {
let exe = exe.shift();
if env.latest_status().success() {
- super::run_exe(&exe, env).await
+ todo!()
+ // super::run_exe(&exe, env).await
} else {
*env.latest_status()
}
@@ -148,7 +154,8 @@ async fn or(
if env.latest_status().success() {
*env.latest_status()
} else {
- super::run_exe(&exe, env).await
+ todo!()
+ // super::run_exe(&exe, env).await
}
}
@@ -157,7 +164,7 @@ async fn command(
env: &super::ProcessEnv,
) -> async_std::process::ExitStatus {
let exe = exe.shift();
- super::run_binary(&exe, env).await;
+ // super::run_binary(&exe, env).await;
*env.latest_status()
}
diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs
index 1912964..6263a1f 100644
--- a/src/state/history/mod.rs
+++ b/src/state/history/mod.rs
@@ -1,9 +1,9 @@
-use async_std::io::{ReadExt as _, WriteExt as _};
-use futures_lite::future::FutureExt as _;
+use async_std::io::WriteExt as _;
use std::os::unix::io::FromRawFd as _;
use std::os::unix::process::ExitStatusExt as _;
mod builtins;
+mod pty;
pub struct History {
size: (u16, u16),
@@ -93,20 +93,21 @@ impl History {
) -> anyhow::Result<usize> {
let (input_w, input_r) = async_std::channel::unbounded();
let (resize_w, resize_r) = async_std::channel::unbounded();
+ let (close_w, close_r) = async_std::channel::unbounded();
let entry = async_std::sync::Arc::new(async_std::sync::Mutex::new(
Entry::new(Ok(ast.clone()), self.size, input_w, resize_w),
));
+ let pty = pty::Pty::new(
+ self.size, &entry, input_r, resize_r, close_r, event_w,
+ )?;
run_commands(
ast.clone(),
- ProcessEnv::new(
- async_std::sync::Arc::clone(&entry),
- self.size,
- input_r,
- resize_r,
- event_w,
- ),
+ pty,
+ async_std::sync::Arc::clone(&entry),
+ ProcessEnv::new(),
+ close_w,
);
self.entries.push(entry);
@@ -545,64 +546,16 @@ impl ExitInfo {
#[derive(Clone)]
pub struct ProcessEnv {
- entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
- pty: async_std::sync::Arc<pty_process::Pty>,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<crate::event::Event>,
-
latest_status: async_std::process::ExitStatus,
}
impl ProcessEnv {
- fn new(
- entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
- size: (u16, u16),
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<crate::event::Event>,
- ) -> Self {
- let pty = pty_process::Pty::new().unwrap();
- pty.resize(pty_process::Size::new(size.0, size.1)).unwrap();
+ fn new() -> Self {
Self {
- entry,
- pty: async_std::sync::Arc::new(pty),
- input_r,
- resize_r,
- event_w,
-
latest_status: async_std::process::ExitStatus::from_raw(0),
}
}
- async fn entry(&self) -> async_std::sync::MutexGuardArc<Entry> {
- self.entry.lock_arc().await
- }
-
- async fn read_pty(&self, buf: &mut [u8]) -> std::io::Result<usize> {
- (&*self.pty).read(buf).await
- }
-
- async fn write_pty(&self, buf: &[u8]) -> std::io::Result<usize> {
- (&*self.pty).write(buf).await
- }
-
- async fn read_input(
- &self,
- ) -> Result<Vec<u8>, async_std::channel::RecvError> {
- self.input_r.recv().await
- }
-
- async fn read_resize(
- &self,
- ) -> Result<(u16, u16), async_std::channel::RecvError> {
- self.resize_r.recv().await
- }
-
- async fn send_event(&self, event: crate::event::Event) {
- self.event_w.send(event).await.unwrap();
- }
-
fn set_status(&mut self, status: async_std::process::ExitStatus) {
self.latest_status = status;
}
@@ -612,53 +565,53 @@ impl ProcessEnv {
}
}
-fn run_commands(ast: crate::parse::Commands, mut env: ProcessEnv) {
+fn run_commands(
+ ast: crate::parse::Commands,
+ pty: pty::Pty,
+ entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
+ mut env: ProcessEnv,
+ close_w: async_std::channel::Sender<()>,
+) {
async_std::task::spawn(async move {
for pipeline in ast.pipelines() {
- let (pipeline_status, done) = run_pipeline(pipeline, &env).await;
+ let (pipeline_status, done) =
+ run_pipeline(pipeline, &pty, &env).await;
env.set_status(pipeline_status);
if done {
break;
}
}
- env.entry().await.exit_info =
+ entry.lock_arc().await.exit_info =
Some(ExitInfo::new(*env.latest_status()));
- env.send_event(crate::event::Event::ProcessExit).await;
+ close_w.send(()).await.unwrap();
});
}
async fn run_pipeline(
pipeline: &crate::parse::Pipeline,
+ pty: &pty::Pty,
env: &ProcessEnv,
) -> (async_std::process::ExitStatus, bool) {
- let status = if pipeline.exes().len() == 1 {
- run_exe(&pipeline.exes()[0], env).await
- } else {
- let mut cmd =
- pty_process::Command::new(std::env::current_exe().unwrap());
- cmd.arg("--internal-pipe-runner");
- let (r, w) =
- nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
- unsafe {
- cmd.pre_exec(move || {
- nix::unistd::dup2(r, 3)?;
- Ok(())
- });
- }
- let child = cmd.spawn(&env.pty).unwrap();
- nix::unistd::close(r).unwrap();
-
- let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
- let pipeline = pipeline.clone();
- let fut = async move {
- w.write_all(pipeline.input_string().as_bytes())
- .await
- .unwrap();
- drop(w);
- child.status_no_drop().await.unwrap()
- };
- run_future(fut, env.clone()).await
- };
+ let mut cmd = pty_process::Command::new(std::env::current_exe().unwrap());
+ cmd.arg("--internal-pipe-runner");
+ let (r, w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
+ unsafe {
+ cmd.pre_exec(move || {
+ nix::unistd::dup2(r, 3)?;
+ Ok(())
+ });
+ }
+ let child = pty.spawn(cmd).unwrap();
+ nix::unistd::close(r).unwrap();
+
+ let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
+ w.write_all(pipeline.input_string().as_bytes())
+ .await
+ .unwrap();
+ // todo: write contents of env also
+ drop(w);
+
+ let status = child.status_no_drop().await.unwrap();
(
status,
// i'm not sure what exactly the expected behavior here is - in zsh,
@@ -669,138 +622,6 @@ async fn run_pipeline(
)
}
-async fn run_exe(
- exe: &crate::parse::Exe,
- env: &ProcessEnv,
-) -> async_std::process::ExitStatus {
- if let Some(fut) = builtins::run(exe, env) {
- run_future(fut, env.clone()).await
- } else {
- let fut = {
- let exe = exe.clone();
- let env = env.clone();
- async move { run_binary(&exe, &env).await }
- };
- run_future(fut, env.clone()).await
- }
-}
-
-async fn run_future<F>(
- fut: F,
- env: ProcessEnv,
-) -> async_std::process::ExitStatus
-where
- F: std::future::Future<Output = async_std::process::ExitStatus>
- + Sync
- + Send
- + 'static,
-{
- let (exit_w, exit_r) = async_std::channel::unbounded();
- async_std::task::spawn(async move {
- let status = fut.await;
- exit_w.send(status).await.unwrap();
- });
- loop {
- enum Res {
- Read(Result<usize, std::io::Error>),
- Write(Result<Vec<u8>, async_std::channel::RecvError>),
- Resize(Result<(u16, u16), async_std::channel::RecvError>),
- Exit(
- Result<
- async_std::process::ExitStatus,
- async_std::channel::RecvError,
- >,
- ),
- }
- let mut buf = [0_u8; 4096];
- let read = async { Res::Read(env.read_pty(&mut buf).await) };
- let write = async { Res::Write(env.read_input().await) };
- let resize = async { Res::Resize(env.read_resize().await) };
- let exit = async { Res::Exit(exit_r.recv().await) };
- match read.race(write).race(resize).or(exit).await {
- Res::Read(res) => match res {
- Ok(bytes) => {
- let mut entry = env.entry().await;
- let pre_alternate_screen =
- entry.vt.screen().alternate_screen();
- entry.vt.process(&buf[..bytes]);
- let post_alternate_screen =
- entry.vt.screen().alternate_screen();
- if entry.fullscreen.is_none()
- && pre_alternate_screen != post_alternate_screen
- {
- env.send_event(
- crate::event::Event::ProcessAlternateScreen,
- )
- .await;
- }
- env.send_event(crate::event::Event::ProcessOutput).await;
- }
- Err(e) => {
- if e.raw_os_error() != Some(libc::EIO) {
- panic!("pty read failed: {:?}", e);
- }
- }
- },
- Res::Write(res) => match res {
- Ok(bytes) => {
- env.write_pty(&bytes).await.unwrap();
- }
- Err(e) => {
- panic!("failed to read from input channel: {}", e);
- }
- },
- Res::Resize(res) => match res {
- Ok(size) => {
- env.pty
- .resize(pty_process::Size::new(size.0, size.1))
- .unwrap();
- env.entry().await.vt.set_size(size.0, size.1);
- }
- Err(e) => {
- panic!("failed to read from resize channel: {}", e);
- }
- },
- Res::Exit(res) => match res {
- Ok(status) => {
- return status;
- }
- Err(e) => {
- panic!("failed to get exit status: {}", e);
- }
- },
- }
- }
-}
-
-async fn run_binary(
- exe: &crate::parse::Exe,
- env: &ProcessEnv,
-) -> async_std::process::ExitStatus {
- let mut process = pty_process::Command::new(exe.exe());
- process.args(exe.args());
- let child = process.spawn(&env.pty);
- let child = match child {
- Ok(child) => child,
- Err(e) => {
- let exe = exe.clone();
- let env = env.clone();
- let e_str = if let pty_process::Error::Io(e) = e {
- crate::format::io_error(&e)
- } else {
- e.to_string()
- };
- env.write_pty(
- format!("nbsh: {}: {}\n", e_str, exe.exe()).as_bytes(),
- )
- .await
- .unwrap();
- return async_std::process::ExitStatus::from_raw(1 << 8);
- }
- };
- child.status_no_drop().await.unwrap()
-}
-
fn set_bgcolor(out: &mut impl textmode::Textmode, idx: usize, focus: bool) {
if focus {
out.set_bgcolor(textmode::Color::Rgb(0x56, 0x1b, 0x8b));
diff --git a/src/state/history/pty.rs b/src/state/history/pty.rs
new file mode 100644
index 0000000..259b280
--- /dev/null
+++ b/src/state/history/pty.rs
@@ -0,0 +1,128 @@
+use async_std::io::{ReadExt as _, WriteExt as _};
+use futures_lite::future::FutureExt as _;
+
+pub struct Pty {
+ pty: async_std::sync::Arc<pty_process::Pty>,
+}
+
+impl Pty {
+ pub fn new(
+ size: (u16, u16),
+ entry: &async_std::sync::Arc<async_std::sync::Mutex<super::Entry>>,
+ input_r: async_std::channel::Receiver<Vec<u8>>,
+ resize_r: async_std::channel::Receiver<(u16, u16)>,
+ close_r: async_std::channel::Receiver<()>,
+ event_w: async_std::channel::Sender<crate::event::Event>,
+ ) -> anyhow::Result<Self> {
+ let pty = pty_process::Pty::new()?;
+ pty.resize(pty_process::Size::new(size.0, size.1))?;
+ let pty = async_std::sync::Arc::new(pty);
+
+ {
+ let entry = async_std::sync::Arc::clone(entry);
+ let pty = async_std::sync::Arc::clone(&pty);
+ async_std::task::spawn(async move {
+ loop {
+ enum Res {
+ Read(Result<usize, std::io::Error>),
+ Write(Result<Vec<u8>, async_std::channel::RecvError>),
+ Resize(
+ Result<(u16, u16), async_std::channel::RecvError>,
+ ),
+ Close(Result<(), async_std::channel::RecvError>),
+ }
+ let mut buf = [0_u8; 4096];
+ let read =
+ async { Res::Read((&*pty).read(&mut buf).await) };
+ let write = async { Res::Write(input_r.recv().await) };
+ let resize = async { Res::Resize(resize_r.recv().await) };
+ let close = async { Res::Close(close_r.recv().await) };
+ match read.race(write).race(resize).or(close).await {
+ Res::Read(res) => match res {
+ Ok(bytes) => {
+ let mut entry = entry.lock_arc().await;
+ let pre_alternate_screen =
+ entry.vt.screen().alternate_screen();
+ entry.vt.process(&buf[..bytes]);
+ let post_alternate_screen =
+ entry.vt.screen().alternate_screen();
+ if entry.fullscreen.is_none()
+ && pre_alternate_screen
+ != post_alternate_screen
+ {
+ event_w.send(
+ crate::event::Event::ProcessAlternateScreen,
+ )
+ .await
+ .unwrap();
+ }
+ event_w
+ .send(crate::event::Event::ProcessOutput)
+ .await
+ .unwrap();
+ }
+ Err(e) => {
+ if e.raw_os_error() != Some(libc::EIO) {
+ panic!("pty read failed: {:?}", e);
+ }
+ }
+ },
+ Res::Write(res) => {
+ match res {
+ Ok(bytes) => {
+ (&*pty).write(&bytes).await.unwrap();
+ }
+ Err(e) => {
+ panic!("failed to read from input channel: {}", e);
+ }
+ }
+ }
+ Res::Resize(res) => match res {
+ Ok(size) => {
+ pty.resize(pty_process::Size::new(
+ size.0, size.1,
+ ))
+ .unwrap();
+ entry
+ .lock_arc()
+ .await
+ .vt
+ .set_size(size.0, size.1);
+ }
+ Err(e) => {
+ panic!(
+ "failed to read from resize channel: {}",
+ e
+ );
+ }
+ },
+ Res::Close(res) => match res {
+ Ok(()) => {
+ event_w
+ .send(crate::event::Event::ProcessExit)
+ .await
+ .unwrap();
+ return;
+ }
+ Err(e) => {
+ panic!(
+ "failed to read from close channel: {}",
+ e
+ );
+ }
+ },
+ }
+ }
+ });
+ }
+
+ Ok(Self { pty })
+ }
+
+ pub fn spawn(
+ &self,
+ mut cmd: pty_process::Command,
+ ) -> anyhow::Result<async_std::process::Child> {
+ Ok(cmd.spawn(&self.pty)?)
+ }
+}