summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/state/history/builtins.rs35
-rw-r--r--src/state/history/mod.rs265
-rw-r--r--src/state/history/pty.rs128
3 files changed, 192 insertions, 236 deletions
diff --git a/src/state/history/builtins.rs b/src/state/history/builtins.rs
index 49043cd..329aa97 100644
--- a/src/state/history/builtins.rs
+++ b/src/state/history/builtins.rs
@@ -1,3 +1,4 @@
+use async_std::io::WriteExt as _;
use std::os::unix::process::ExitStatusExt as _;
type Builtin = &'static (dyn for<'a> Fn(
@@ -99,7 +100,10 @@ async fn cd(
home().join(path.strip_prefix(prefix).unwrap())
} else {
// TODO
- env.write_pty(b"unimplemented\n").await.unwrap();
+ async_std::io::stderr()
+ .write(b"unimplemented\n")
+ .await
+ .unwrap();
return async_std::process::ExitStatus::from_raw(1 << 8);
}
} else {
@@ -111,17 +115,18 @@ async fn cd(
let code = match std::env::set_current_dir(&dir) {
Ok(()) => 0,
Err(e) => {
- env.write_pty(
- format!(
- "{}: {}: {}\n",
- exe.exe(),
- crate::format::io_error(&e),
- dir.display()
+ async_std::io::stderr()
+ .write(
+ format!(
+ "{}: {}: {}\n",
+ exe.exe(),
+ crate::format::io_error(&e),
+ dir.display()
+ )
+ .as_bytes(),
)
- .as_bytes(),
- )
- .await
- .unwrap();
+ .await
+ .unwrap();
1
}
};
@@ -134,7 +139,8 @@ async fn and(
) -> async_std::process::ExitStatus {
let exe = exe.shift();
if env.latest_status().success() {
- super::run_exe(&exe, env).await
+ todo!()
+ // super::run_exe(&exe, env).await
} else {
*env.latest_status()
}
@@ -148,7 +154,8 @@ async fn or(
if env.latest_status().success() {
*env.latest_status()
} else {
- super::run_exe(&exe, env).await
+ todo!()
+ // super::run_exe(&exe, env).await
}
}
@@ -157,7 +164,7 @@ async fn command(
env: &super::ProcessEnv,
) -> async_std::process::ExitStatus {
let exe = exe.shift();
- super::run_binary(&exe, env).await;
+ // super::run_binary(&exe, env).await;
*env.latest_status()
}
diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs
index 1912964..6263a1f 100644
--- a/src/state/history/mod.rs
+++ b/src/state/history/mod.rs
@@ -1,9 +1,9 @@
-use async_std::io::{ReadExt as _, WriteExt as _};
-use futures_lite::future::FutureExt as _;
+use async_std::io::WriteExt as _;
use std::os::unix::io::FromRawFd as _;
use std::os::unix::process::ExitStatusExt as _;
mod builtins;
+mod pty;
pub struct History {
size: (u16, u16),
@@ -93,20 +93,21 @@ impl History {
) -> anyhow::Result<usize> {
let (input_w, input_r) = async_std::channel::unbounded();
let (resize_w, resize_r) = async_std::channel::unbounded();
+ let (close_w, close_r) = async_std::channel::unbounded();
let entry = async_std::sync::Arc::new(async_std::sync::Mutex::new(
Entry::new(Ok(ast.clone()), self.size, input_w, resize_w),
));
+ let pty = pty::Pty::new(
+ self.size, &entry, input_r, resize_r, close_r, event_w,
+ )?;
run_commands(
ast.clone(),
- ProcessEnv::new(
- async_std::sync::Arc::clone(&entry),
- self.size,
- input_r,
- resize_r,
- event_w,
- ),
+ pty,
+ async_std::sync::Arc::clone(&entry),
+ ProcessEnv::new(),
+ close_w,
);
self.entries.push(entry);
@@ -545,64 +546,16 @@ impl ExitInfo {
#[derive(Clone)]
pub struct ProcessEnv {
- entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
- pty: async_std::sync::Arc<pty_process::Pty>,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<crate::event::Event>,
-
latest_status: async_std::process::ExitStatus,
}
impl ProcessEnv {
- fn new(
- entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
- size: (u16, u16),
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<crate::event::Event>,
- ) -> Self {
- let pty = pty_process::Pty::new().unwrap();
- pty.resize(pty_process::Size::new(size.0, size.1)).unwrap();
+ fn new() -> Self {
Self {
- entry,
- pty: async_std::sync::Arc::new(pty),
- input_r,
- resize_r,
- event_w,
-
latest_status: async_std::process::ExitStatus::from_raw(0),
}
}
- async fn entry(&self) -> async_std::sync::MutexGuardArc<Entry> {
- self.entry.lock_arc().await
- }
-
- async fn read_pty(&self, buf: &mut [u8]) -> std::io::Result<usize> {
- (&*self.pty).read(buf).await
- }
-
- async fn write_pty(&self, buf: &[u8]) -> std::io::Result<usize> {
- (&*self.pty).write(buf).await
- }
-
- async fn read_input(
- &self,
- ) -> Result<Vec<u8>, async_std::channel::RecvError> {
- self.input_r.recv().await
- }
-
- async fn read_resize(
- &self,
- ) -> Result<(u16, u16), async_std::channel::RecvError> {
- self.resize_r.recv().await
- }
-
- async fn send_event(&self, event: crate::event::Event) {
- self.event_w.send(event).await.unwrap();
- }
-
fn set_status(&mut self, status: async_std::process::ExitStatus) {
self.latest_status = status;
}
@@ -612,53 +565,53 @@ impl ProcessEnv {
}
}
-fn run_commands(ast: crate::parse::Commands, mut env: ProcessEnv) {
+fn run_commands(
+ ast: crate::parse::Commands,
+ pty: pty::Pty,
+ entry: async_std::sync::Arc<async_std::sync::Mutex<Entry>>,
+ mut env: ProcessEnv,
+ close_w: async_std::channel::Sender<()>,
+) {
async_std::task::spawn(async move {
for pipeline in ast.pipelines() {
- let (pipeline_status, done) = run_pipeline(pipeline, &env).await;
+ let (pipeline_status, done) =
+ run_pipeline(pipeline, &pty, &env).await;
env.set_status(pipeline_status);
if done {
break;
}
}
- env.entry().await.exit_info =
+ entry.lock_arc().await.exit_info =
Some(ExitInfo::new(*env.latest_status()));
- env.send_event(crate::event::Event::ProcessExit).await;
+ close_w.send(()).await.unwrap();
});
}
async fn run_pipeline(
pipeline: &crate::parse::Pipeline,
+ pty: &pty::Pty,
env: &ProcessEnv,
) -> (async_std::process::ExitStatus, bool) {
- let status = if pipeline.exes().len() == 1 {
- run_exe(&pipeline.exes()[0], env).await
- } else {
- let mut cmd =
- pty_process::Command::new(std::env::current_exe().unwrap());
- cmd.arg("--internal-pipe-runner");
- let (r, w) =
- nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
- unsafe {
- cmd.pre_exec(move || {
- nix::unistd::dup2(r, 3)?;
- Ok(())
- });
- }
- let child = cmd.spawn(&env.pty).unwrap();
- nix::unistd::close(r).unwrap();
-
- let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
- let pipeline = pipeline.clone();
- let fut = async move {
- w.write_all(pipeline.input_string().as_bytes())
- .await
- .unwrap();
- drop(w);
- child.status_no_drop().await.unwrap()
- };
- run_future(fut, env.clone()).await
- };
+ let mut cmd = pty_process::Command::new(std::env::current_exe().unwrap());
+ cmd.arg("--internal-pipe-runner");
+ let (r, w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
+ unsafe {
+ cmd.pre_exec(move || {
+ nix::unistd::dup2(r, 3)?;
+ Ok(())
+ });
+ }
+ let child = pty.spawn(cmd).unwrap();
+ nix::unistd::close(r).unwrap();
+
+ let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
+ w.write_all(pipeline.input_string().as_bytes())
+ .await
+ .unwrap();
+ // todo: write contents of env also
+ drop(w);
+
+ let status = child.status_no_drop().await.unwrap();
(
status,
// i'm not sure what exactly the expected behavior here is - in zsh,
@@ -669,138 +622,6 @@ async fn run_pipeline(
)
}
-async fn run_exe(
- exe: &crate::parse::Exe,
- env: &ProcessEnv,
-) -> async_std::process::ExitStatus {
- if let Some(fut) = builtins::run(exe, env) {
- run_future(fut, env.clone()).await
- } else {
- let fut = {
- let exe = exe.clone();
- let env = env.clone();
- async move { run_binary(&exe, &env).await }
- };
- run_future(fut, env.clone()).await
- }
-}
-
-async fn run_future<F>(
- fut: F,
- env: ProcessEnv,
-) -> async_std::process::ExitStatus
-where
- F: std::future::Future<Output = async_std::process::ExitStatus>
- + Sync
- + Send
- + 'static,
-{
- let (exit_w, exit_r) = async_std::channel::unbounded();
- async_std::task::spawn(async move {
- let status = fut.await;
- exit_w.send(status).await.unwrap();
- });
- loop {
- enum Res {
- Read(Result<usize, std::io::Error>),
- Write(Result<Vec<u8>, async_std::channel::RecvError>),
- Resize(Result<(u16, u16), async_std::channel::RecvError>),
- Exit(
- Result<
- async_std::process::ExitStatus,
- async_std::channel::RecvError,
- >,
- ),
- }
- let mut buf = [0_u8; 4096];
- let read = async { Res::Read(env.read_pty(&mut buf).await) };
- let write = async { Res::Write(env.read_input().await) };
- let resize = async { Res::Resize(env.read_resize().await) };
- let exit = async { Res::Exit(exit_r.recv().await) };
- match read.race(write).race(resize).or(exit).await {
- Res::Read(res) => match res {
- Ok(bytes) => {
- let mut entry = env.entry().await;
- let pre_alternate_screen =
- entry.vt.screen().alternate_screen();
- entry.vt.process(&buf[..bytes]);
- let post_alternate_screen =
- entry.vt.screen().alternate_screen();
- if entry.fullscreen.is_none()
- && pre_alternate_screen != post_alternate_screen
- {
- env.send_event(
- crate::event::Event::ProcessAlternateScreen,
- )
- .await;
- }
- env.send_event(crate::event::Event::ProcessOutput).await;
- }
- Err(e) => {
- if e.raw_os_error() != Some(libc::EIO) {
- panic!("pty read failed: {:?}", e);
- }
- }
- },
- Res::Write(res) => match res {
- Ok(bytes) => {
- env.write_pty(&bytes).await.unwrap();
- }
- Err(e) => {
- panic!("failed to read from input channel: {}", e);
- }
- },
- Res::Resize(res) => match res {
- Ok(size) => {
- env.pty
- .resize(pty_process::Size::new(size.0, size.1))
- .unwrap();
- env.entry().await.vt.set_size(size.0, size.1);
- }
- Err(e) => {
- panic!("failed to read from resize channel: {}", e);
- }
- },
- Res::Exit(res) => match res {
- Ok(status) => {
- return status;
- }
- Err(e) => {
- panic!("failed to get exit status: {}", e);
- }
- },
- }
- }
-}
-
-async fn run_binary(
- exe: &crate::parse::Exe,
- env: &ProcessEnv,
-) -> async_std::process::ExitStatus {
- let mut process = pty_process::Command::new(exe.exe());
- process.args(exe.args());
- let child = process.spawn(&env.pty);
- let child = match child {
- Ok(child) => child,
- Err(e) => {
- let exe = exe.clone();
- let env = env.clone();
- let e_str = if let pty_process::Error::Io(e) = e {
- crate::format::io_error(&e)
- } else {
- e.to_string()
- };
- env.write_pty(
- format!("nbsh: {}: {}\n", e_str, exe.exe()).as_bytes(),
- )
- .await
- .unwrap();
- return async_std::process::ExitStatus::from_raw(1 << 8);
- }
- };
- child.status_no_drop().await.unwrap()
-}
-
fn set_bgcolor(out: &mut impl textmode::Textmode, idx: usize, focus: bool) {
if focus {
out.set_bgcolor(textmode::Color::Rgb(0x56, 0x1b, 0x8b));
diff --git a/src/state/history/pty.rs b/src/state/history/pty.rs
new file mode 100644
index 0000000..259b280
--- /dev/null
+++ b/src/state/history/pty.rs
@@ -0,0 +1,128 @@
+use async_std::io::{ReadExt as _, WriteExt as _};
+use futures_lite::future::FutureExt as _;
+
+pub struct Pty {
+ pty: async_std::sync::Arc<pty_process::Pty>,
+}
+
+impl Pty {
+ pub fn new(
+ size: (u16, u16),
+ entry: &async_std::sync::Arc<async_std::sync::Mutex<super::Entry>>,
+ input_r: async_std::channel::Receiver<Vec<u8>>,
+ resize_r: async_std::channel::Receiver<(u16, u16)>,
+ close_r: async_std::channel::Receiver<()>,
+ event_w: async_std::channel::Sender<crate::event::Event>,
+ ) -> anyhow::Result<Self> {
+ let pty = pty_process::Pty::new()?;
+ pty.resize(pty_process::Size::new(size.0, size.1))?;
+ let pty = async_std::sync::Arc::new(pty);
+
+ {
+ let entry = async_std::sync::Arc::clone(entry);
+ let pty = async_std::sync::Arc::clone(&pty);
+ async_std::task::spawn(async move {
+ loop {
+ enum Res {
+ Read(Result<usize, std::io::Error>),
+ Write(Result<Vec<u8>, async_std::channel::RecvError>),
+ Resize(
+ Result<(u16, u16), async_std::channel::RecvError>,
+ ),
+ Close(Result<(), async_std::channel::RecvError>),
+ }
+ let mut buf = [0_u8; 4096];
+ let read =
+ async { Res::Read((&*pty).read(&mut buf).await) };
+ let write = async { Res::Write(input_r.recv().await) };
+ let resize = async { Res::Resize(resize_r.recv().await) };
+ let close = async { Res::Close(close_r.recv().await) };
+ match read.race(write).race(resize).or(close).await {
+ Res::Read(res) => match res {
+ Ok(bytes) => {
+ let mut entry = entry.lock_arc().await;
+ let pre_alternate_screen =
+ entry.vt.screen().alternate_screen();
+ entry.vt.process(&buf[..bytes]);
+ let post_alternate_screen =
+ entry.vt.screen().alternate_screen();
+ if entry.fullscreen.is_none()
+ && pre_alternate_screen
+ != post_alternate_screen
+ {
+ event_w.send(
+ crate::event::Event::ProcessAlternateScreen,
+ )
+ .await
+ .unwrap();
+ }
+ event_w
+ .send(crate::event::Event::ProcessOutput)
+ .await
+ .unwrap();
+ }
+ Err(e) => {
+ if e.raw_os_error() != Some(libc::EIO) {
+ panic!("pty read failed: {:?}", e);
+ }
+ }
+ },
+ Res::Write(res) => {
+ match res {
+ Ok(bytes) => {
+ (&*pty).write(&bytes).await.unwrap();
+ }
+ Err(e) => {
+ panic!("failed to read from input channel: {}", e);
+ }
+ }
+ }
+ Res::Resize(res) => match res {
+ Ok(size) => {
+ pty.resize(pty_process::Size::new(
+ size.0, size.1,
+ ))
+ .unwrap();
+ entry
+ .lock_arc()
+ .await
+ .vt
+ .set_size(size.0, size.1);
+ }
+ Err(e) => {
+ panic!(
+ "failed to read from resize channel: {}",
+ e
+ );
+ }
+ },
+ Res::Close(res) => match res {
+ Ok(()) => {
+ event_w
+ .send(crate::event::Event::ProcessExit)
+ .await
+ .unwrap();
+ return;
+ }
+ Err(e) => {
+ panic!(
+ "failed to read from close channel: {}",
+ e
+ );
+ }
+ },
+ }
+ }
+ });
+ }
+
+ Ok(Self { pty })
+ }
+
+ pub fn spawn(
+ &self,
+ mut cmd: pty_process::Command,
+ ) -> anyhow::Result<async_std::process::Child> {
+ Ok(cmd.spawn(&self.pty)?)
+ }
+}