From ab79e981a0bcf6cd0670395bf0ee1877df2c2bc3 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 1 Jan 2022 16:52:14 -0500 Subject: clean up pipe code --- src/main.rs | 15 +++-- src/parse.rs | 12 ++++ src/pipe.rs | 208 +++++++++++++++++++++++++++++++++++++---------------------- 3 files changed, 152 insertions(+), 83 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index a4a0773..2472901 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,11 +43,6 @@ fn get_offset() -> time::UtcOffset { } async fn async_main() -> anyhow::Result<()> { - if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { - pipe::run().await; - std::process::exit(0); - } - let mut input = textmode::Input::new().await?; let mut output = textmode::Output::new().await?; @@ -141,6 +136,16 @@ async fn async_main() -> anyhow::Result<()> { } fn main() { + if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { + match pipe::run() { + Ok(code) => std::process::exit(code), + Err(e) => { + eprintln!("nbsh: {}", e); + std::process::exit(1); + } + } + } + match async_std::task::block_on(async_main()) { Ok(_) => (), Err(e) => { diff --git a/src/parse.rs b/src/parse.rs index 526b6ce..0249d7f 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -155,3 +155,15 @@ impl Error { &self.e } } + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "failed to parse {}: {}", self.input, self.e) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&*self.e) + } +} diff --git a/src/pipe.rs b/src/pipe.rs index f4b2d21..eafc4d8 100644 --- a/src/pipe.rs +++ b/src/pipe.rs @@ -1,94 +1,146 @@ -use async_std::io::prelude::ReadExt as _; -use async_std::os::unix::process::CommandExt as _; -use async_std::stream::StreamExt as _; +use std::io::Read as _; use std::os::unix::io::FromRawFd as _; -use std::os::unix::process::ExitStatusExt as _; +use std::os::unix::process::CommandExt as _; -async fn read_pipeline() -> crate::parse::Pipeline { - let mut r = unsafe { async_std::fs::File::from_raw_fd(3) }; - let mut s = String::new(); - r.read_to_string(&mut s).await.unwrap(); - crate::parse::Pipeline::parse(&s).unwrap() -} +const PID0: nix::unistd::Pid = nix::unistd::Pid::from_raw(0); -pub async fn run() { - let pipeline = read_pipeline().await; +pub fn run() -> anyhow::Result { + let pipeline = read_pipeline()?; + let mut cmds: Vec<_> = pipeline + .exes() + .iter() + .map(|exe| { + let mut cmd = std::process::Command::new(exe.exe()); + cmd.args(exe.args()); + cmd + }) + .collect(); + for i in 0..(cmds.len() - 1) { + let (r, w) = pipe()?; + cmds[i].stdout(w); + cmds[i + 1].stdin(r); + } - let mut futures = futures_util::stream::FuturesUnordered::new(); - let mut pg = None; - let mut stdin = None; - let last = pipeline.exes().len() - 1; - for (i, exe) in pipeline.exes().iter().enumerate() { - let mut cmd = async_std::process::Command::new(exe.exe()); - cmd.args(exe.args()); - if let Some(stdin) = stdin { - cmd.stdin(unsafe { - async_std::process::Stdio::from_raw_fd(stdin) - }); - } - if i < last { - let (r, w) = - nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); - stdin = Some(r); - cmd.stdout(unsafe { async_std::process::Stdio::from_raw_fd(w) }); - } - let pg_pid = nix::unistd::Pid::from_raw(pg.unwrap_or(0)); + let mut children = vec![]; + + unsafe { + cmds[0].pre_exec(|| { + setpgid_child(PID0)?; + Ok(()) + }); + } + let leader = cmds[0].spawn()?; + let pg_pid = id_to_pid(leader.id()); + setpgid_parent(pg_pid, PID0)?; + set_foreground_pg(pg_pid)?; + children.push(leader); + + for cmd in &mut cmds[1..] { unsafe { cmd.pre_exec(move || { - nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), pg_pid)?; + setpgid_child(pg_pid)?; Ok(()) }); } - let child = cmd.spawn().unwrap(); - let res = nix::unistd::setpgid( - nix::unistd::Pid::from_raw(child.id().try_into().unwrap()), - pg_pid, - ); - match res { - Ok(()) => {} - Err(e) => { - if e != nix::errno::Errno::EACCES { - res.unwrap(); + let child = cmd.spawn()?; + let child_pid = id_to_pid(child.id()); + children.push(child); + setpgid_parent(child_pid, pg_pid)?; + } + drop(cmds); + + let last_pid = id_to_pid(children[children.len() - 1].id()); + let mut children: std::collections::HashMap< + nix::unistd::Pid, + std::process::Child, + > = children + .into_iter() + .map(|child| (id_to_pid(child.id()), child)) + .collect(); + let mut final_code = None; + let mut final_signal = None; + while !children.is_empty() { + match nix::sys::wait::waitpid(neg_pid(pg_pid), None)? { + nix::sys::wait::WaitStatus::Exited(pid, code) => { + if pid == last_pid { + final_code = Some(code); } + children.remove(&pid); } + nix::sys::wait::WaitStatus::Signaled(pid, signal, _) => { + if signal == nix::sys::signal::Signal::SIGINT { + nix::sys::signal::raise(nix::sys::signal::SIGINT)?; + } + if pid == last_pid { + final_signal = Some(signal); + } + children.remove(&pid); + } + _ => {} } - if pg.is_none() { - pg = Some(child.id().try_into().unwrap()); - let pty = nix::fcntl::open( - "/dev/tty", - nix::fcntl::OFlag::empty(), - nix::sys::stat::Mode::empty(), - ) - .unwrap(); - nix::unistd::tcsetpgrp( - pty, - nix::unistd::Pid::from_raw(pg.unwrap()), - ) - .unwrap(); - nix::unistd::close(pty).unwrap(); - nix::sys::signal::kill( - nix::unistd::Pid::from_raw(-pg.unwrap()), - nix::sys::signal::Signal::SIGCONT, - ) - .unwrap(); - } - futures.push(async move { - (child.status_no_drop().await.unwrap(), i == last) - }); } + if let Some(signal) = final_signal { + nix::sys::signal::raise(signal)?; + } + Ok(final_code.unwrap()) +} - let mut final_status = None; - while let Some((status, last)) = futures.next().await { - if status.signal() == Some(signal_hook::consts::signal::SIGINT) { - nix::sys::signal::raise(nix::sys::signal::SIGINT).unwrap(); - } - if last { - final_status = Some(status); +fn read_pipeline() -> anyhow::Result { + // Safety: this code is only called by crate::history::run_pipeline, which + // passes data through on fd 3, and which will not spawn this process + // unless the pipe was successfully opened on that fd + let mut fd3 = unsafe { std::fs::File::from_raw_fd(3) }; + let mut pipeline = String::new(); + fd3.read_to_string(&mut pipeline)?; + let ast = crate::parse::Pipeline::parse(&pipeline)?; + Ok(ast) +} + +fn pipe() -> anyhow::Result<(std::fs::File, std::fs::File)> { + let (r, w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + // Safety: these file descriptors were just returned by pipe2 above, which + // means they must be valid otherwise that call would have returned an + // error + Ok((unsafe { std::fs::File::from_raw_fd(r) }, unsafe { + std::fs::File::from_raw_fd(w) + })) +} + +fn set_foreground_pg(pg: nix::unistd::Pid) -> anyhow::Result<()> { + let pty = nix::fcntl::open( + "/dev/tty", + nix::fcntl::OFlag::empty(), + nix::sys::stat::Mode::empty(), + )?; + nix::unistd::tcsetpgrp(pty, pg)?; + nix::unistd::close(pty)?; + nix::sys::signal::kill(neg_pid(pg), nix::sys::signal::Signal::SIGCONT)?; + Ok(()) +} + +fn setpgid_child(pg: nix::unistd::Pid) -> std::io::Result<()> { + nix::unistd::setpgid(id_to_pid(0), pg)?; + Ok(()) +} + +fn setpgid_parent( + pid: nix::unistd::Pid, + pg: nix::unistd::Pid, +) -> anyhow::Result<()> { + nix::unistd::setpgid(pid, pg).or_else(|e| { + if e == nix::errno::Errno::EACCES { + Ok(()) + } else { + Err(e) } - } - if let Some(code) = final_status.unwrap().code() { - std::process::exit(code); - } else { - std::process::exit(255); - } + })?; + Ok(()) +} + +fn id_to_pid(id: u32) -> nix::unistd::Pid { + nix::unistd::Pid::from_raw(id.try_into().unwrap()) +} + +fn neg_pid(pid: nix::unistd::Pid) -> nix::unistd::Pid { + nix::unistd::Pid::from_raw(-pid.as_raw()) } -- cgit v1.2.3-54-g00ecf