diff options
Diffstat (limited to 'src/runner/mod.rs')
-rw-r--r-- | src/runner/mod.rs | 310 |
1 files changed, 120 insertions, 190 deletions
diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 01a87b9..91e268a 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -4,13 +4,12 @@ mod builtins; mod command; pub use command::{Child, Command}; mod prelude; - -const PID0: nix::unistd::Pid = nix::unistd::Pid::from_raw(0); +mod sys; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum Event { - RunPipeline(usize, (usize, usize)), - Suspend(usize), + RunPipeline((usize, usize)), + Suspend, Exit(Env), } @@ -69,12 +68,13 @@ enum Frame { For(bool, usize, Vec<String>), } -pub async fn run( - commands: &str, - shell_write: Option<&async_std::fs::File>, -) -> anyhow::Result<i32> { +pub async fn main( + commands: String, + shell_write: &mut Option<tokio::fs::File>, +) -> Result<i32> { let mut env = Env::new_from_env()?; - run_commands(commands, &mut env, shell_write).await?; + let config = crate::config::Config::load()?; + run_commands(commands, &mut env, &config, shell_write).await?; let status = env.latest_status(); write_event(shell_write, Event::Exit(env)).await?; @@ -85,11 +85,12 @@ pub async fn run( } async fn run_commands( - commands: &str, + commands: String, env: &mut Env, - shell_write: Option<&async_std::fs::File>, -) -> anyhow::Result<()> { - let commands = crate::parse::ast::Commands::parse(commands)?; + config: &crate::config::Config, + shell_write: &mut Option<tokio::fs::File>, +) -> Result<()> { + let commands = crate::parse::ast::Commands::parse(&commands)?; let commands = commands.commands(); let mut pc = 0; let mut stack = Stack::new(); @@ -97,7 +98,8 @@ async fn run_commands( match &commands[pc] { crate::parse::ast::Command::Pipeline(pipeline) => { if stack.should_execute() { - run_pipeline(pipeline.clone(), env, shell_write).await?; + run_pipeline(pipeline.clone(), env, config, shell_write) + .await?; } pc += 1; } @@ -108,7 +110,8 @@ async fn run_commands( } if should { let status = env.latest_status(); - run_pipeline(pipeline.clone(), env, shell_write).await?; + run_pipeline(pipeline.clone(), env, config, shell_write) + .await?; if let Some(Frame::If(should, found)) = stack.top_mut() { *should = env.latest_status().success(); if *should { @@ -128,7 +131,8 @@ async fn run_commands( } if should { let status = env.latest_status(); - run_pipeline(pipeline.clone(), env, shell_write).await?; + run_pipeline(pipeline.clone(), env, config, shell_write) + .await?; if let Some(Frame::While(should, _)) = stack.top_mut() { *should = env.latest_status().success(); } else { @@ -153,7 +157,7 @@ async fn run_commands( .map(IntoIterator::into_iter) }) .collect::<futures_util::stream::FuturesOrdered<_>>() - .collect::<Result<Vec<_>, _>>().await? + .try_collect::<Vec<_>>().await? .into_iter() .flatten() .collect() @@ -188,8 +192,13 @@ async fn run_commands( *should = false; } else if let Some(pipeline) = pipeline { let status = env.latest_status(); - run_pipeline(pipeline.clone(), env, shell_write) - .await?; + run_pipeline( + pipeline.clone(), + env, + config, + shell_write, + ) + .await?; *should = env.latest_status().success(); if *should { *found = true; @@ -232,30 +241,54 @@ async fn run_commands( async fn run_pipeline( pipeline: crate::parse::ast::Pipeline, env: &mut Env, - shell_write: Option<&async_std::fs::File>, -) -> anyhow::Result<()> { - write_event(shell_write, Event::RunPipeline(env.idx(), pipeline.span())) - .await?; + config: &crate::config::Config, + shell_write: &mut Option<tokio::fs::File>, +) -> Result<()> { + write_event(shell_write, Event::RunPipeline(pipeline.span())).await?; // Safety: pipelines are run serially, so only one copy of these will ever // exist at once. note that reusing a single copy of these at the top // level would not be safe, because in the case of a command line like // "echo foo; ls", we would pass the stdout fd to the ls process while it // is still open here, and may still have data buffered. - let stdin = unsafe { async_std::fs::File::from_raw_fd(0) }; - let stdout = unsafe { async_std::fs::File::from_raw_fd(1) }; - let stderr = unsafe { async_std::fs::File::from_raw_fd(2) }; + let stdin = unsafe { std::fs::File::from_raw_fd(0) }; + let stdout = unsafe { std::fs::File::from_raw_fd(1) }; + let stderr = unsafe { std::fs::File::from_raw_fd(2) }; let mut io = builtins::Io::new(); io.set_stdin(stdin); io.set_stdout(stdout); io.set_stderr(stderr); let pwd = env.pwd().to_path_buf(); - let pipeline = pipeline.eval(env).await?; let interactive = shell_write.is_some(); - let (children, pg) = spawn_children(pipeline, env, &io, interactive)?; - let status = wait_children(children, pg, env, &io, shell_write).await; + let pipeline = pipeline.eval(env).await?; + let mut exes: Vec<_> = pipeline.into_exes().collect(); + for exe in &mut exes { + let mut seen = std::collections::HashSet::new(); + while let Some(alias) = config.alias_for(exe.exe()) { + let mut new = alias.clone().eval(env).await?; + let override_self = exe.exe() == new.exe(); + if seen.contains(new.exe()) { + return Err(anyhow!( + "recursive alias found: {}", + new.exe().display() + )); + } + seen.insert(new.exe().to_path_buf()); + new.append(exe.clone()); + *exe = new; + if override_self { + break; + } + } + } + let cmds = exes + .into_iter() + .map(|exe| Command::new(exe, io.clone())) + .collect(); + let (children, pg) = spawn_children(cmds, env, interactive)?; + let status = wait_children(children, pg, shell_write).await; if interactive { - set_foreground_pg(nix::unistd::getpid())?; + sys::set_foreground_pg(nix::unistd::getpid())?; } env.update()?; env.set_status(status); @@ -266,28 +299,23 @@ async fn run_pipeline( } async fn write_event( - fh: Option<&async_std::fs::File>, + fh: &mut Option<tokio::fs::File>, event: Event, -) -> anyhow::Result<()> { - if let Some(mut fh) = fh { +) -> Result<()> { + if let Some(fh) = fh { fh.write_all(&bincode::serialize(&event)?).await?; fh.flush().await?; } Ok(()) } -fn spawn_children<'a>( - pipeline: crate::parse::Pipeline, - env: &'a Env, - io: &builtins::Io, +fn spawn_children( + mut cmds: Vec<Command>, + env: &Env, interactive: bool, -) -> anyhow::Result<(Vec<Child<'a>>, Option<nix::unistd::Pid>)> { - let mut cmds: Vec<_> = pipeline - .into_exes() - .map(|exe| Command::new(exe, io.clone())) - .collect(); +) -> Result<(Vec<Child>, Option<nix::unistd::Pid>)> { for i in 0..(cmds.len() - 1) { - let (r, w) = pipe()?; + let (r, w) = sys::pipe()?; cmds[i].stdout(w); cmds[i + 1].stdin(r); } @@ -298,18 +326,18 @@ fn spawn_children<'a>( // Safety: setpgid is an async-signal-safe function unsafe { cmd.pre_exec(move || { - setpgid_child(pg_pid)?; + sys::setpgid_child(pg_pid)?; Ok(()) }); } let child = cmd.spawn(env)?; if let Some(id) = child.id() { - let child_pid = id_to_pid(id); - setpgid_parent(child_pid, pg_pid)?; + let child_pid = sys::id_to_pid(id); + sys::setpgid_parent(child_pid, pg_pid)?; if pg_pid.is_none() { pg_pid = Some(child_pid); if interactive { - set_foreground_pg(child_pid)?; + sys::set_foreground_pg(child_pid)?; } } } @@ -319,24 +347,18 @@ fn spawn_children<'a>( } async fn wait_children( - children: Vec<Child<'_>>, + children: Vec<Child>, pg: Option<nix::unistd::Pid>, - env: &Env, - io: &builtins::Io, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option<tokio::fs::File>, ) -> std::process::ExitStatus { enum Res { Child(nix::Result<nix::sys::wait::WaitStatus>), - Builtin(Option<(anyhow::Result<std::process::ExitStatus>, bool)>), + Builtin((Result<std::process::ExitStatus>, bool)), } macro_rules! bail { ($e:expr) => { - // if writing to stderr is not possible, we still want to exit - // normally with a failure exit code - #[allow(clippy::let_underscore_drop)] - let _ = - io.write_stderr(format!("nbsh: {}\n", $e).as_bytes()).await; + eprintln!("nbsh: {}\n", $e); return std::process::ExitStatus::from_raw(1 << 8); }; } @@ -351,10 +373,11 @@ async fn wait_children( let mut children: std::collections::HashMap<_, _> = children .into_iter() .map(|(i, child)| { - (id_to_pid(child.id().unwrap()), (child, i == count - 1)) + (sys::id_to_pid(child.id().unwrap()), (child, i == count - 1)) }) .collect(); - let mut builtins: futures_util::stream::FuturesUnordered<_> = + let mut builtin_count = builtins.len(); + let builtins: futures_util::stream::FuturesUnordered<_> = builtins .into_iter() .map(|(i, child)| async move { @@ -362,47 +385,40 @@ async fn wait_children( }) .collect(); - let (wait_w, wait_r) = async_std::channel::unbounded(); - let new_wait = move || { - if let Some(pg) = pg { - let wait_w = wait_w.clone(); - async_std::task::spawn(async move { - let res = blocking::unblock(move || { - nix::sys::wait::waitpid( - neg_pid(pg), - Some(nix::sys::wait::WaitPidFlag::WUNTRACED), - ) - }) - .await; - if wait_w.is_closed() { - // we shouldn't be able to drop real process terminations + let (wait_w, wait_r) = tokio::sync::mpsc::unbounded_channel(); + if let Some(pg) = pg { + tokio::task::spawn_blocking(move || loop { + let res = nix::sys::wait::waitpid( + sys::neg_pid(pg), + Some(nix::sys::wait::WaitPidFlag::WUNTRACED), + ); + match wait_w.send(res) { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(res)) => { + // we should never drop wait_r while there are still valid + // things to read assert!(res.is_err()); - } else { - wait_w.send(res).await.unwrap(); + break; } - }); - } - }; - - new_wait(); - loop { - if children.is_empty() && builtins.is_empty() { - break; - } + } + }); + } - let child = async { Res::Child(wait_r.recv().await.unwrap()) }; - let builtin = async { - Res::Builtin(if builtins.is_empty() { - std::future::pending().await - } else { - builtins.next().await - }) - }; - match child.race(builtin).await { + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(wait_r) + .map(Res::Child) + .boxed(), + builtins.map(Res::Builtin).boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { Res::Child(Ok(status)) => { match status { - // we can't call child.status() here to unify these branches - // because our waitpid call already collected the status + // we can't call child.status() here to unify these + // branches because our waitpid call already collected the + // status nix::sys::wait::WaitStatus::Exited(pid, code) => { let (_, last) = children.remove(&pid).unwrap(); if last { @@ -432,11 +448,8 @@ async fn wait_children( } nix::sys::wait::WaitStatus::Stopped(pid, signal) => { if signal == nix::sys::signal::Signal::SIGTSTP { - if let Err(e) = write_event( - shell_write, - Event::Suspend(env.idx()), - ) - .await + if let Err(e) = + write_event(shell_write, Event::Suspend).await { bail!(e); } @@ -450,12 +463,11 @@ async fn wait_children( } _ => {} } - new_wait(); } Res::Child(Err(e)) => { bail!(e); } - Res::Builtin(Some((Ok(status), last))) => { + Res::Builtin((Ok(status), last)) => { // this conversion is safe because the Signal enum is // repr(i32) #[allow(clippy::as_conversions)] @@ -471,99 +483,17 @@ async fn wait_children( if last { final_status = Some(status); } + builtin_count -= 1; } - Res::Builtin(Some((Err(e), _))) => { + Res::Builtin((Err(e), _)) => { bail!(e); } - Res::Builtin(None) => {} } - } - final_status.unwrap() -} - -fn pipe() -> anyhow::Result<(std::fs::File, std::fs::File)> { - let (r, w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; - // Safety: these file descriptors were just returned by pipe2 above, and - // are only available in this function, so nothing else can be accessing - // them - Ok((unsafe { std::fs::File::from_raw_fd(r) }, unsafe { - std::fs::File::from_raw_fd(w) - })) -} - -fn set_foreground_pg(pg: nix::unistd::Pid) -> anyhow::Result<()> { - let pty = nix::fcntl::open( - "/dev/tty", - nix::fcntl::OFlag::empty(), - nix::sys::stat::Mode::empty(), - )?; - - // if a background process calls tcsetpgrp, the kernel will send it - // SIGTTOU which suspends it. if that background process is the session - // leader and doesn't have SIGTTOU blocked, the kernel will instead just - // return ENOTTY from the tcsetpgrp call rather than sending a signal to - // avoid deadlocking the process. therefore, we need to ensure that - // SIGTTOU is blocked here. - - // Safety: setting a signal handler to SigIgn is always safe - unsafe { - nix::sys::signal::signal( - nix::sys::signal::Signal::SIGTTOU, - nix::sys::signal::SigHandler::SigIgn, - )?; - } - let res = nix::unistd::tcsetpgrp(pty, pg); - // Safety: setting a signal handler to SigDfl is always safe - unsafe { - nix::sys::signal::signal( - nix::sys::signal::Signal::SIGTTOU, - nix::sys::signal::SigHandler::SigDfl, - )?; - } - res?; - - nix::unistd::close(pty)?; - - nix::sys::signal::kill(neg_pid(pg), nix::sys::signal::Signal::SIGCONT) - .or_else(|e| { - // the process group has already exited - if e == nix::errno::Errno::ESRCH { - Ok(()) - } else { - Err(e) - } - })?; - - Ok(()) -} - -fn setpgid_child(pg: Option<nix::unistd::Pid>) -> std::io::Result<()> { - nix::unistd::setpgid(PID0, pg.unwrap_or(PID0))?; - Ok(()) -} - -fn setpgid_parent( - pid: nix::unistd::Pid, - pg: Option<nix::unistd::Pid>, -) -> anyhow::Result<()> { - nix::unistd::setpgid(pid, pg.unwrap_or(PID0)).or_else(|e| { - // EACCES means that the child already called exec, but if it did, - // then it also must have already called setpgid itself, so we don't - // care. ESRCH means that the process already exited, which is similar - if e == nix::errno::Errno::EACCES || e == nix::errno::Errno::ESRCH { - Ok(()) - } else { - Err(e) + if children.is_empty() && builtin_count == 0 { + break; } - })?; - Ok(()) -} - -fn id_to_pid(id: u32) -> nix::unistd::Pid { - nix::unistd::Pid::from_raw(id.try_into().unwrap()) -} + } -fn neg_pid(pid: nix::unistd::Pid) -> nix::unistd::Pid { - nix::unistd::Pid::from_raw(-pid.as_raw()) + final_status.unwrap() } |