From 07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 26 Feb 2022 15:44:13 -0500 Subject: remove the mutex for builtin fds --- src/prelude.rs | 2 + src/runner/builtins/command.rs | 148 ++++++++++++++++++----------------------- src/runner/builtins/mod.rs | 80 ++++++---------------- src/runner/command.rs | 9 ++- src/runner/mod.rs | 17 ++--- 5 files changed, 94 insertions(+), 162 deletions(-) (limited to 'src') diff --git a/src/prelude.rs b/src/prelude.rs index 9c14a4b..9fe5992 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,5 +1,7 @@ pub use crate::env::Env; +pub use std::io::{Read as _, Write as _}; + pub use futures_util::future::FutureExt as _; pub use futures_util::stream::StreamExt as _; pub use futures_util::stream::TryStreamExt as _; diff --git a/src/runner/builtins/command.rs b/src/runner/builtins/command.rs index e0e1853..85f6594 100644 --- a/src/runner/builtins/command.rs +++ b/src/runner/builtins/command.rs @@ -38,8 +38,8 @@ impl Command { self.cfg.io.set_stderr(fh); } - // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this - // is just a wrapper) + // Safety: see pre_exec in tokio::process::Command (this is just a + // wrapper) pub unsafe fn pre_exec(&mut self, f: F) where F: 'static + FnMut() -> std::io::Result<()> + Send + Sync, @@ -73,8 +73,8 @@ impl Cfg { &self.io } - // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this - // is just a wrapper) + // Safety: see pre_exec in tokio::process::Command (this is just a + // wrapper) pub unsafe fn pre_exec(&mut self, f: F) where F: 'static + FnMut() -> std::io::Result<()> + Send + Sync, @@ -97,7 +97,7 @@ impl Cfg { pub struct Io { fds: std::collections::HashMap< std::os::unix::io::RawFd, - std::sync::Arc>, + std::sync::Arc, >, } @@ -108,7 +108,7 @@ impl Io { } } - fn stdin(&self) -> Option>> { + fn stdin(&self) -> Option> { self.fds.get(&0).map(std::sync::Arc::clone) } @@ -120,13 +120,11 @@ impl Io { 0, // Safety: we just acquired stdin via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { - File::input(stdin.into_raw_fd()) - })), + std::sync::Arc::new(unsafe { File::input(stdin.into_raw_fd()) }), ); } - fn stdout(&self) -> Option>> { + fn stdout(&self) -> Option> { self.fds.get(&1).map(std::sync::Arc::clone) } @@ -138,13 +136,13 @@ impl Io { 1, // Safety: we just acquired stdout via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { + std::sync::Arc::new(unsafe { File::output(stdout.into_raw_fd()) - })), + }), ); } - fn stderr(&self) -> Option>> { + fn stderr(&self) -> Option> { self.fds.get(&2).map(std::sync::Arc::clone) } @@ -156,9 +154,9 @@ impl Io { 2, // Safety: we just acquired stderr via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { + std::sync::Arc::new(unsafe { File::output(stderr.into_raw_fd()) - })), + }), ); } @@ -174,17 +172,13 @@ impl Io { crate::parse::Direction::In => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(tokio::sync::Mutex::new( - unsafe { File::input(fd) }, - )) + std::sync::Arc::new(unsafe { File::input(fd) }) } crate::parse::Direction::Out | crate::parse::Direction::Append => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(tokio::sync::Mutex::new( - unsafe { File::output(fd) }, - )) + std::sync::Arc::new(unsafe { File::output(fd) }) } } } @@ -193,10 +187,10 @@ impl Io { } } - pub async fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> { - let mut buf = vec![]; - if let Some(fh) = self.stdin() { - if let File::In(fh) = &mut *fh.clone().lock_owned().await { + pub fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> { + let mut line = vec![]; + if let Some(file) = self.stdin() { + if let File::In(fh) = &*file { // we have to read only a single character at a time here // because stdin needs to be shared across all commands in the // command list, some of which may be builtins and others of @@ -205,26 +199,27 @@ impl Io { // no longer be available to the next command, since we have // them buffered in memory rather than them being on the stdin // pipe. - while let Ok(byte) = fh.read_u8().await { - buf.push(byte); + for byte in fh.bytes() { + let byte = byte?; + line.push(byte); if byte == b'\n' { break; } } } } - let done = buf.is_empty(); - let mut buf = String::from_utf8(buf).unwrap(); - if buf.ends_with('\n') { - buf.truncate(buf.len() - 1); + let done = line.is_empty(); + let mut line = String::from_utf8(line).unwrap(); + if line.ends_with('\n') { + line.truncate(line.len() - 1); } - Ok((buf, done)) + Ok((line, done)) } - pub async fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> { - if let Some(fh) = self.stdout() { - if let File::Out(fh) = &mut *fh.clone().lock_owned().await { - Ok(fh.write_all(buf).await.map(|_| ())?) + pub fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> { + if let Some(file) = self.stdout() { + if let File::Out(fh) = &*file { + Ok((&*fh).write_all(buf)?) } else { Ok(()) } @@ -233,10 +228,10 @@ impl Io { } } - pub async fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> { - if let Some(fh) = self.stderr() { - if let File::Out(fh) = &mut *fh.clone().lock_owned().await { - Ok(fh.write_all(buf).await.map(|_| ())?) + pub fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> { + if let Some(file) = self.stderr() { + if let File::Out(fh) = &*file { + Ok((&*fh).write_all(buf)?) } else { Ok(()) } @@ -248,7 +243,7 @@ impl Io { pub fn setup_command(mut self, cmd: &mut crate::runner::Command) { if let Some(stdin) = self.fds.remove(&0) { if let Ok(stdin) = std::sync::Arc::try_unwrap(stdin) { - let stdin = stdin.into_inner().into_raw_fd(); + let stdin = stdin.into_raw_fd(); if stdin != 0 { // Safety: we just acquired stdin via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -260,7 +255,7 @@ impl Io { } if let Some(stdout) = self.fds.remove(&1) { if let Ok(stdout) = std::sync::Arc::try_unwrap(stdout) { - let stdout = stdout.into_inner().into_raw_fd(); + let stdout = stdout.into_raw_fd(); if stdout != 1 { // Safety: we just acquired stdout via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -272,7 +267,7 @@ impl Io { } if let Some(stderr) = self.fds.remove(&2) { if let Ok(stderr) = std::sync::Arc::try_unwrap(stderr) { - let stderr = stderr.into_inner().into_raw_fd(); + let stderr = stderr.into_raw_fd(); if stderr != 2 { // Safety: we just acquired stderr via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -295,24 +290,23 @@ impl Drop for Io { #[derive(Debug)] pub enum File { - In(tokio::fs::File), - Out(tokio::fs::File), + In(std::fs::File), + Out(std::fs::File), } impl File { // Safety: fd must not be owned by any other File object pub unsafe fn input(fd: std::os::unix::io::RawFd) -> Self { - Self::In(tokio::fs::File::from_raw_fd(fd)) + Self::In(std::fs::File::from_raw_fd(fd)) } // Safety: fd must not be owned by any other File object pub unsafe fn output(fd: std::os::unix::io::RawFd) -> Self { - Self::Out(tokio::fs::File::from_raw_fd(fd)) + Self::Out(std::fs::File::from_raw_fd(fd)) } - fn maybe_drop(file: std::sync::Arc>) { + fn maybe_drop(file: std::sync::Arc) { if let Ok(file) = std::sync::Arc::try_unwrap(file) { - let file = file.into_inner(); if file.as_raw_fd() <= 2 { let _ = file.into_raw_fd(); } @@ -331,49 +325,33 @@ impl std::os::unix::io::AsRawFd for File { impl std::os::unix::io::IntoRawFd for File { fn into_raw_fd(self) -> std::os::unix::io::RawFd { match self { - Self::In(fh) | Self::Out(fh) => { - // XXX - fh.try_into_std().unwrap().into_raw_fd() - } + Self::In(fh) | Self::Out(fh) => fh.into_raw_fd(), } } } -pub struct Child<'a> { - fut: std::pin::Pin< - Box< - dyn std::future::Future - + Sync - + Send - + 'a, - >, - >, - wrapped_child: Option>>, +pub enum Child { + Task(tokio::task::JoinHandle), + Wrapped(Box), } -impl<'a> Child<'a> { - pub fn new_fut(fut: F) -> Self +impl Child { + pub fn new_task(f: F) -> Self where - F: std::future::Future - + Sync - + Send - + 'a, + F: FnOnce() -> std::process::ExitStatus + Send + 'static, { - Self { - fut: Box::pin(fut), - wrapped_child: None, - } + Self::Task(tokio::task::spawn_blocking(f)) } - pub fn new_wrapped(child: crate::runner::Child<'a>) -> Self { - Self { - fut: Box::pin(async move { unreachable!() }), - wrapped_child: Some(Box::new(child)), - } + pub fn new_wrapped(child: crate::runner::Child) -> Self { + Self::Wrapped(Box::new(child)) } pub fn id(&self) -> Option { - self.wrapped_child.as_ref().and_then(|cmd| cmd.id()) + match self { + Self::Task(_) => None, + Self::Wrapped(child) => child.id(), + } } pub fn status( @@ -383,15 +361,15 @@ impl<'a> Child<'a> { dyn std::future::Future< Output = anyhow::Result, > + Send - + Sync - + 'a, + + Sync, >, > { Box::pin(async move { - if let Some(child) = self.wrapped_child { - child.status().await - } else { - Ok(self.fut.await) + match self { + Self::Task(task) => { + task.await.map_err(|e| anyhow::anyhow!(e)) + } + Self::Wrapped(child) => child.status().await, } }) } diff --git a/src/runner/builtins/mod.rs b/src/runner/builtins/mod.rs index 87b5ae7..f212496 100644 --- a/src/runner/builtins/mod.rs +++ b/src/runner/builtins/mod.rs @@ -7,7 +7,7 @@ type Builtin = &'static (dyn for<'a> Fn( crate::parse::Exe, &'a Env, command::Cfg, -) -> anyhow::Result> +) -> anyhow::Result + Sync + Send); @@ -33,7 +33,6 @@ macro_rules! bail { $cfg.io().write_stderr( format!("{}: {}\n", $exe.exe().display(), $msg).as_bytes() ) - .await .unwrap(); return std::process::ExitStatus::from_raw(1 << 8); }; @@ -41,12 +40,10 @@ macro_rules! bail { $cfg.io().write_stderr( format!("{}: ", $exe.exe().display()).as_bytes() ) - .await .unwrap(); $cfg.io().write_stderr(format!($msg, $($arg)*).as_bytes()) - .await .unwrap(); - $cfg.io().write_stderr(b"\n").await.unwrap(); + $cfg.io().write_stderr(b"\n").unwrap(); return std::process::ExitStatus::from_raw(1 << 8); }; } @@ -58,21 +55,19 @@ fn cd( env: &Env, cfg: command::Cfg, ) -> anyhow::Result { - async fn async_cd( - exe: crate::parse::Exe, - env: &Env, - cfg: command::Cfg, - ) -> std::process::ExitStatus { + let prev_pwd = env.prev_pwd(); + let home = env.var("HOME"); + Ok(command::Child::new_task(move || { let dir = if let Some(dir) = exe.args().get(0) { if dir.is_empty() { ".".to_string().into() } else if dir == "-" { - env.prev_pwd() + prev_pwd } else { dir.into() } } else { - let dir = env.var("HOME"); + let dir = home; if let Some(dir) = dir { dir.into() } else { @@ -89,24 +84,16 @@ fn cd( ); } std::process::ExitStatus::from_raw(0) - } - - Ok(command::Child::new_fut(async move { - async_cd(exe, env, cfg).await })) } #[allow(clippy::unnecessary_wraps)] fn set( exe: crate::parse::Exe, - env: &Env, + _env: &Env, cfg: command::Cfg, ) -> anyhow::Result { - async fn async_set( - exe: crate::parse::Exe, - _env: &Env, - cfg: command::Cfg, - ) -> std::process::ExitStatus { + Ok(command::Child::new_task(move || { let k = if let Some(k) = exe.args().get(0).map(String::as_str) { k } else { @@ -120,24 +107,16 @@ fn set( std::env::set_var(k, v); std::process::ExitStatus::from_raw(0) - } - - Ok(command::Child::new_fut(async move { - async_set(exe, env, cfg).await })) } #[allow(clippy::unnecessary_wraps)] fn unset( exe: crate::parse::Exe, - env: &Env, + _env: &Env, cfg: command::Cfg, ) -> anyhow::Result { - async fn async_unset( - exe: crate::parse::Exe, - _env: &Env, - cfg: command::Cfg, - ) -> std::process::ExitStatus { + Ok(command::Child::new_task(move || { let k = if let Some(k) = exe.args().get(0).map(String::as_str) { k } else { @@ -146,10 +125,6 @@ fn unset( std::env::remove_var(k); std::process::ExitStatus::from_raw(0) - } - - Ok(command::Child::new_fut(async move { - async_unset(exe, env, cfg).await })) } @@ -159,20 +134,15 @@ fn unset( // this later, since the binary seems totally fine fn echo( exe: crate::parse::Exe, - env: &Env, + _env: &Env, cfg: command::Cfg, ) -> anyhow::Result { - async fn async_echo( - exe: crate::parse::Exe, - _env: &Env, - cfg: command::Cfg, - ) -> std::process::ExitStatus { + Ok(command::Child::new_task(move || { macro_rules! write_stdout { ($bytes:expr) => { - if let Err(e) = cfg.io().write_stdout($bytes).await { + if let Err(e) = cfg.io().write_stdout($bytes) { cfg.io() .write_stderr(format!("echo: {}", e).as_bytes()) - .await .unwrap(); return std::process::ExitStatus::from_raw(1 << 8); } @@ -189,31 +159,23 @@ fn echo( } std::process::ExitStatus::from_raw(0) - } - - Ok(command::Child::new_fut(async move { - async_echo(exe, env, cfg).await })) } #[allow(clippy::unnecessary_wraps)] fn read( exe: crate::parse::Exe, - env: &Env, + _env: &Env, cfg: command::Cfg, ) -> anyhow::Result { - async fn async_read( - exe: crate::parse::Exe, - _env: &Env, - cfg: command::Cfg, - ) -> std::process::ExitStatus { + Ok(command::Child::new_task(move || { let var = if let Some(var) = exe.args().get(0).map(String::as_str) { var } else { bail!(cfg, exe, "usage: read var"); }; - let (val, done) = match cfg.io().read_line_stdin().await { + let (val, done) = match cfg.io().read_line_stdin() { Ok((line, done)) => (line, done), Err(e) => { bail!(cfg, exe, e); @@ -222,10 +184,6 @@ fn read( std::env::set_var(var, val); std::process::ExitStatus::from_raw(if done { 1 << 8 } else { 0 }) - } - - Ok(command::Child::new_fut(async move { - async_read(exe, env, cfg).await })) } @@ -241,7 +199,7 @@ fn and( Ok(command::Child::new_wrapped(cmd.spawn(env)?)) } else { let status = env.latest_status(); - Ok(command::Child::new_fut(async move { status })) + Ok(command::Child::new_task(move || status)) } } @@ -253,7 +211,7 @@ fn or( exe.shift(); if env.latest_status().success() { let status = env.latest_status(); - Ok(command::Child::new_fut(async move { status })) + Ok(command::Child::new_task(move || status)) } else { let mut cmd = crate::runner::Command::new(exe, cfg.io().clone()); cfg.setup_command(&mut cmd); diff --git a/src/runner/command.rs b/src/runner/command.rs index c7224e6..efbf166 100644 --- a/src/runner/command.rs +++ b/src/runner/command.rs @@ -150,12 +150,12 @@ pub enum Inner { Builtin(super::builtins::Command), } -pub enum Child<'a> { +pub enum Child { Binary(tokio::process::Child), - Builtin(super::builtins::Child<'a>), + Builtin(super::builtins::Child), } -impl<'a> Child<'a> { +impl Child { pub fn id(&self) -> Option { match self { Self::Binary(child) => child.id(), @@ -170,8 +170,7 @@ impl<'a> Child<'a> { dyn std::future::Future< Output = anyhow::Result, > + Send - + Sync - + 'a, + + Sync, >, > { Box::pin(async move { diff --git a/src/runner/mod.rs b/src/runner/mod.rs index d06b332..acdb127 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -252,7 +252,7 @@ async fn run_pipeline( let pipeline = pipeline.eval(env).await?; let interactive = shell_write.is_some(); let (children, pg) = spawn_children(pipeline, env, &io, interactive)?; - let status = wait_children(children, pg, env, &io, shell_write).await; + let status = wait_children(children, pg, env, shell_write).await; if interactive { sys::set_foreground_pg(nix::unistd::getpid())?; } @@ -275,12 +275,12 @@ async fn write_event( Ok(()) } -fn spawn_children<'a>( +fn spawn_children( pipeline: crate::parse::Pipeline, - env: &'a Env, + env: &Env, io: &builtins::Io, interactive: bool, -) -> anyhow::Result<(Vec>, Option)> { +) -> anyhow::Result<(Vec, Option)> { let mut cmds: Vec<_> = pipeline .into_exes() .map(|exe| Command::new(exe, io.clone())) @@ -318,10 +318,9 @@ fn spawn_children<'a>( } async fn wait_children( - children: Vec>, + children: Vec, pg: Option, env: &Env, - io: &builtins::Io, shell_write: &mut Option, ) -> std::process::ExitStatus { enum Res { @@ -331,11 +330,7 @@ async fn wait_children( macro_rules! bail { ($e:expr) => { - // if writing to stderr is not possible, we still want to exit - // normally with a failure exit code - #[allow(clippy::let_underscore_drop)] - let _ = - io.write_stderr(format!("nbsh: {}\n", $e).as_bytes()).await; + eprintln!("nbsh: {}\n", $e); return std::process::ExitStatus::from_raw(1 << 8); }; } -- cgit v1.2.3-54-g00ecf