summaryrefslogtreecommitdiffstats
path: root/src/runner/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/runner/mod.rs')
-rw-r--r--src/runner/mod.rs310
1 files changed, 120 insertions, 190 deletions
diff --git a/src/runner/mod.rs b/src/runner/mod.rs
index 01a87b9..91e268a 100644
--- a/src/runner/mod.rs
+++ b/src/runner/mod.rs
@@ -4,13 +4,12 @@ mod builtins;
mod command;
pub use command::{Child, Command};
mod prelude;
-
-const PID0: nix::unistd::Pid = nix::unistd::Pid::from_raw(0);
+mod sys;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum Event {
- RunPipeline(usize, (usize, usize)),
- Suspend(usize),
+ RunPipeline((usize, usize)),
+ Suspend,
Exit(Env),
}
@@ -69,12 +68,13 @@ enum Frame {
For(bool, usize, Vec<String>),
}
-pub async fn run(
- commands: &str,
- shell_write: Option<&async_std::fs::File>,
-) -> anyhow::Result<i32> {
+pub async fn main(
+ commands: String,
+ shell_write: &mut Option<tokio::fs::File>,
+) -> Result<i32> {
let mut env = Env::new_from_env()?;
- run_commands(commands, &mut env, shell_write).await?;
+ let config = crate::config::Config::load()?;
+ run_commands(commands, &mut env, &config, shell_write).await?;
let status = env.latest_status();
write_event(shell_write, Event::Exit(env)).await?;
@@ -85,11 +85,12 @@ pub async fn run(
}
async fn run_commands(
- commands: &str,
+ commands: String,
env: &mut Env,
- shell_write: Option<&async_std::fs::File>,
-) -> anyhow::Result<()> {
- let commands = crate::parse::ast::Commands::parse(commands)?;
+ config: &crate::config::Config,
+ shell_write: &mut Option<tokio::fs::File>,
+) -> Result<()> {
+ let commands = crate::parse::ast::Commands::parse(&commands)?;
let commands = commands.commands();
let mut pc = 0;
let mut stack = Stack::new();
@@ -97,7 +98,8 @@ async fn run_commands(
match &commands[pc] {
crate::parse::ast::Command::Pipeline(pipeline) => {
if stack.should_execute() {
- run_pipeline(pipeline.clone(), env, shell_write).await?;
+ run_pipeline(pipeline.clone(), env, config, shell_write)
+ .await?;
}
pc += 1;
}
@@ -108,7 +110,8 @@ async fn run_commands(
}
if should {
let status = env.latest_status();
- run_pipeline(pipeline.clone(), env, shell_write).await?;
+ run_pipeline(pipeline.clone(), env, config, shell_write)
+ .await?;
if let Some(Frame::If(should, found)) = stack.top_mut() {
*should = env.latest_status().success();
if *should {
@@ -128,7 +131,8 @@ async fn run_commands(
}
if should {
let status = env.latest_status();
- run_pipeline(pipeline.clone(), env, shell_write).await?;
+ run_pipeline(pipeline.clone(), env, config, shell_write)
+ .await?;
if let Some(Frame::While(should, _)) = stack.top_mut() {
*should = env.latest_status().success();
} else {
@@ -153,7 +157,7 @@ async fn run_commands(
.map(IntoIterator::into_iter)
})
.collect::<futures_util::stream::FuturesOrdered<_>>()
- .collect::<Result<Vec<_>, _>>().await?
+ .try_collect::<Vec<_>>().await?
.into_iter()
.flatten()
.collect()
@@ -188,8 +192,13 @@ async fn run_commands(
*should = false;
} else if let Some(pipeline) = pipeline {
let status = env.latest_status();
- run_pipeline(pipeline.clone(), env, shell_write)
- .await?;
+ run_pipeline(
+ pipeline.clone(),
+ env,
+ config,
+ shell_write,
+ )
+ .await?;
*should = env.latest_status().success();
if *should {
*found = true;
@@ -232,30 +241,54 @@ async fn run_commands(
async fn run_pipeline(
pipeline: crate::parse::ast::Pipeline,
env: &mut Env,
- shell_write: Option<&async_std::fs::File>,
-) -> anyhow::Result<()> {
- write_event(shell_write, Event::RunPipeline(env.idx(), pipeline.span()))
- .await?;
+ config: &crate::config::Config,
+ shell_write: &mut Option<tokio::fs::File>,
+) -> Result<()> {
+ write_event(shell_write, Event::RunPipeline(pipeline.span())).await?;
// Safety: pipelines are run serially, so only one copy of these will ever
// exist at once. note that reusing a single copy of these at the top
// level would not be safe, because in the case of a command line like
// "echo foo; ls", we would pass the stdout fd to the ls process while it
// is still open here, and may still have data buffered.
- let stdin = unsafe { async_std::fs::File::from_raw_fd(0) };
- let stdout = unsafe { async_std::fs::File::from_raw_fd(1) };
- let stderr = unsafe { async_std::fs::File::from_raw_fd(2) };
+ let stdin = unsafe { std::fs::File::from_raw_fd(0) };
+ let stdout = unsafe { std::fs::File::from_raw_fd(1) };
+ let stderr = unsafe { std::fs::File::from_raw_fd(2) };
let mut io = builtins::Io::new();
io.set_stdin(stdin);
io.set_stdout(stdout);
io.set_stderr(stderr);
let pwd = env.pwd().to_path_buf();
- let pipeline = pipeline.eval(env).await?;
let interactive = shell_write.is_some();
- let (children, pg) = spawn_children(pipeline, env, &io, interactive)?;
- let status = wait_children(children, pg, env, &io, shell_write).await;
+ let pipeline = pipeline.eval(env).await?;
+ let mut exes: Vec<_> = pipeline.into_exes().collect();
+ for exe in &mut exes {
+ let mut seen = std::collections::HashSet::new();
+ while let Some(alias) = config.alias_for(exe.exe()) {
+ let mut new = alias.clone().eval(env).await?;
+ let override_self = exe.exe() == new.exe();
+ if seen.contains(new.exe()) {
+ return Err(anyhow!(
+ "recursive alias found: {}",
+ new.exe().display()
+ ));
+ }
+ seen.insert(new.exe().to_path_buf());
+ new.append(exe.clone());
+ *exe = new;
+ if override_self {
+ break;
+ }
+ }
+ }
+ let cmds = exes
+ .into_iter()
+ .map(|exe| Command::new(exe, io.clone()))
+ .collect();
+ let (children, pg) = spawn_children(cmds, env, interactive)?;
+ let status = wait_children(children, pg, shell_write).await;
if interactive {
- set_foreground_pg(nix::unistd::getpid())?;
+ sys::set_foreground_pg(nix::unistd::getpid())?;
}
env.update()?;
env.set_status(status);
@@ -266,28 +299,23 @@ async fn run_pipeline(
}
async fn write_event(
- fh: Option<&async_std::fs::File>,
+ fh: &mut Option<tokio::fs::File>,
event: Event,
-) -> anyhow::Result<()> {
- if let Some(mut fh) = fh {
+) -> Result<()> {
+ if let Some(fh) = fh {
fh.write_all(&bincode::serialize(&event)?).await?;
fh.flush().await?;
}
Ok(())
}
-fn spawn_children<'a>(
- pipeline: crate::parse::Pipeline,
- env: &'a Env,
- io: &builtins::Io,
+fn spawn_children(
+ mut cmds: Vec<Command>,
+ env: &Env,
interactive: bool,
-) -> anyhow::Result<(Vec<Child<'a>>, Option<nix::unistd::Pid>)> {
- let mut cmds: Vec<_> = pipeline
- .into_exes()
- .map(|exe| Command::new(exe, io.clone()))
- .collect();
+) -> Result<(Vec<Child>, Option<nix::unistd::Pid>)> {
for i in 0..(cmds.len() - 1) {
- let (r, w) = pipe()?;
+ let (r, w) = sys::pipe()?;
cmds[i].stdout(w);
cmds[i + 1].stdin(r);
}
@@ -298,18 +326,18 @@ fn spawn_children<'a>(
// Safety: setpgid is an async-signal-safe function
unsafe {
cmd.pre_exec(move || {
- setpgid_child(pg_pid)?;
+ sys::setpgid_child(pg_pid)?;
Ok(())
});
}
let child = cmd.spawn(env)?;
if let Some(id) = child.id() {
- let child_pid = id_to_pid(id);
- setpgid_parent(child_pid, pg_pid)?;
+ let child_pid = sys::id_to_pid(id);
+ sys::setpgid_parent(child_pid, pg_pid)?;
if pg_pid.is_none() {
pg_pid = Some(child_pid);
if interactive {
- set_foreground_pg(child_pid)?;
+ sys::set_foreground_pg(child_pid)?;
}
}
}
@@ -319,24 +347,18 @@ fn spawn_children<'a>(
}
async fn wait_children(
- children: Vec<Child<'_>>,
+ children: Vec<Child>,
pg: Option<nix::unistd::Pid>,
- env: &Env,
- io: &builtins::Io,
- shell_write: Option<&async_std::fs::File>,
+ shell_write: &mut Option<tokio::fs::File>,
) -> std::process::ExitStatus {
enum Res {
Child(nix::Result<nix::sys::wait::WaitStatus>),
- Builtin(Option<(anyhow::Result<std::process::ExitStatus>, bool)>),
+ Builtin((Result<std::process::ExitStatus>, bool)),
}
macro_rules! bail {
($e:expr) => {
- // if writing to stderr is not possible, we still want to exit
- // normally with a failure exit code
- #[allow(clippy::let_underscore_drop)]
- let _ =
- io.write_stderr(format!("nbsh: {}\n", $e).as_bytes()).await;
+ eprintln!("nbsh: {}\n", $e);
return std::process::ExitStatus::from_raw(1 << 8);
};
}
@@ -351,10 +373,11 @@ async fn wait_children(
let mut children: std::collections::HashMap<_, _> = children
.into_iter()
.map(|(i, child)| {
- (id_to_pid(child.id().unwrap()), (child, i == count - 1))
+ (sys::id_to_pid(child.id().unwrap()), (child, i == count - 1))
})
.collect();
- let mut builtins: futures_util::stream::FuturesUnordered<_> =
+ let mut builtin_count = builtins.len();
+ let builtins: futures_util::stream::FuturesUnordered<_> =
builtins
.into_iter()
.map(|(i, child)| async move {
@@ -362,47 +385,40 @@ async fn wait_children(
})
.collect();
- let (wait_w, wait_r) = async_std::channel::unbounded();
- let new_wait = move || {
- if let Some(pg) = pg {
- let wait_w = wait_w.clone();
- async_std::task::spawn(async move {
- let res = blocking::unblock(move || {
- nix::sys::wait::waitpid(
- neg_pid(pg),
- Some(nix::sys::wait::WaitPidFlag::WUNTRACED),
- )
- })
- .await;
- if wait_w.is_closed() {
- // we shouldn't be able to drop real process terminations
+ let (wait_w, wait_r) = tokio::sync::mpsc::unbounded_channel();
+ if let Some(pg) = pg {
+ tokio::task::spawn_blocking(move || loop {
+ let res = nix::sys::wait::waitpid(
+ sys::neg_pid(pg),
+ Some(nix::sys::wait::WaitPidFlag::WUNTRACED),
+ );
+ match wait_w.send(res) {
+ Ok(_) => {}
+ Err(tokio::sync::mpsc::error::SendError(res)) => {
+ // we should never drop wait_r while there are still valid
+ // things to read
assert!(res.is_err());
- } else {
- wait_w.send(res).await.unwrap();
+ break;
}
- });
- }
- };
-
- new_wait();
- loop {
- if children.is_empty() && builtins.is_empty() {
- break;
- }
+ }
+ });
+ }
- let child = async { Res::Child(wait_r.recv().await.unwrap()) };
- let builtin = async {
- Res::Builtin(if builtins.is_empty() {
- std::future::pending().await
- } else {
- builtins.next().await
- })
- };
- match child.race(builtin).await {
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_stream::wrappers::UnboundedReceiverStream::new(wait_r)
+ .map(Res::Child)
+ .boxed(),
+ builtins.map(Res::Builtin).boxed(),
+ ]
+ .into_iter()
+ .collect();
+ while let Some(res) = stream.next().await {
+ match res {
Res::Child(Ok(status)) => {
match status {
- // we can't call child.status() here to unify these branches
- // because our waitpid call already collected the status
+ // we can't call child.status() here to unify these
+ // branches because our waitpid call already collected the
+ // status
nix::sys::wait::WaitStatus::Exited(pid, code) => {
let (_, last) = children.remove(&pid).unwrap();
if last {
@@ -432,11 +448,8 @@ async fn wait_children(
}
nix::sys::wait::WaitStatus::Stopped(pid, signal) => {
if signal == nix::sys::signal::Signal::SIGTSTP {
- if let Err(e) = write_event(
- shell_write,
- Event::Suspend(env.idx()),
- )
- .await
+ if let Err(e) =
+ write_event(shell_write, Event::Suspend).await
{
bail!(e);
}
@@ -450,12 +463,11 @@ async fn wait_children(
}
_ => {}
}
- new_wait();
}
Res::Child(Err(e)) => {
bail!(e);
}
- Res::Builtin(Some((Ok(status), last))) => {
+ Res::Builtin((Ok(status), last)) => {
// this conversion is safe because the Signal enum is
// repr(i32)
#[allow(clippy::as_conversions)]
@@ -471,99 +483,17 @@ async fn wait_children(
if last {
final_status = Some(status);
}
+ builtin_count -= 1;
}
- Res::Builtin(Some((Err(e), _))) => {
+ Res::Builtin((Err(e), _)) => {
bail!(e);
}
- Res::Builtin(None) => {}
}
- }
- final_status.unwrap()
-}
-
-fn pipe() -> anyhow::Result<(std::fs::File, std::fs::File)> {
- let (r, w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
- // Safety: these file descriptors were just returned by pipe2 above, and
- // are only available in this function, so nothing else can be accessing
- // them
- Ok((unsafe { std::fs::File::from_raw_fd(r) }, unsafe {
- std::fs::File::from_raw_fd(w)
- }))
-}
-
-fn set_foreground_pg(pg: nix::unistd::Pid) -> anyhow::Result<()> {
- let pty = nix::fcntl::open(
- "/dev/tty",
- nix::fcntl::OFlag::empty(),
- nix::sys::stat::Mode::empty(),
- )?;
-
- // if a background process calls tcsetpgrp, the kernel will send it
- // SIGTTOU which suspends it. if that background process is the session
- // leader and doesn't have SIGTTOU blocked, the kernel will instead just
- // return ENOTTY from the tcsetpgrp call rather than sending a signal to
- // avoid deadlocking the process. therefore, we need to ensure that
- // SIGTTOU is blocked here.
-
- // Safety: setting a signal handler to SigIgn is always safe
- unsafe {
- nix::sys::signal::signal(
- nix::sys::signal::Signal::SIGTTOU,
- nix::sys::signal::SigHandler::SigIgn,
- )?;
- }
- let res = nix::unistd::tcsetpgrp(pty, pg);
- // Safety: setting a signal handler to SigDfl is always safe
- unsafe {
- nix::sys::signal::signal(
- nix::sys::signal::Signal::SIGTTOU,
- nix::sys::signal::SigHandler::SigDfl,
- )?;
- }
- res?;
-
- nix::unistd::close(pty)?;
-
- nix::sys::signal::kill(neg_pid(pg), nix::sys::signal::Signal::SIGCONT)
- .or_else(|e| {
- // the process group has already exited
- if e == nix::errno::Errno::ESRCH {
- Ok(())
- } else {
- Err(e)
- }
- })?;
-
- Ok(())
-}
-
-fn setpgid_child(pg: Option<nix::unistd::Pid>) -> std::io::Result<()> {
- nix::unistd::setpgid(PID0, pg.unwrap_or(PID0))?;
- Ok(())
-}
-
-fn setpgid_parent(
- pid: nix::unistd::Pid,
- pg: Option<nix::unistd::Pid>,
-) -> anyhow::Result<()> {
- nix::unistd::setpgid(pid, pg.unwrap_or(PID0)).or_else(|e| {
- // EACCES means that the child already called exec, but if it did,
- // then it also must have already called setpgid itself, so we don't
- // care. ESRCH means that the process already exited, which is similar
- if e == nix::errno::Errno::EACCES || e == nix::errno::Errno::ESRCH {
- Ok(())
- } else {
- Err(e)
+ if children.is_empty() && builtin_count == 0 {
+ break;
}
- })?;
- Ok(())
-}
-
-fn id_to_pid(id: u32) -> nix::unistd::Pid {
- nix::unistd::Pid::from_raw(id.try_into().unwrap())
-}
+ }
-fn neg_pid(pid: nix::unistd::Pid) -> nix::unistd::Pid {
- nix::unistd::Pid::from_raw(-pid.as_raw())
+ final_status.unwrap()
}