summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-02-26 15:44:13 -0500
committerJesse Luehrs <doy@tozt.net>2022-02-26 15:44:13 -0500
commit07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7 (patch)
tree2e6e6628cb7bbef52e8eeb66a90e69562d2ad09a /src
parent6015f1000e09e72d1355105921294e2c37cf1fc2 (diff)
downloadnbsh-07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7.tar.gz
nbsh-07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7.zip
remove the mutex for builtin fds
Diffstat (limited to 'src')
-rw-r--r--src/prelude.rs2
-rw-r--r--src/runner/builtins/command.rs148
-rw-r--r--src/runner/builtins/mod.rs80
-rw-r--r--src/runner/command.rs9
-rw-r--r--src/runner/mod.rs17
5 files changed, 94 insertions, 162 deletions
diff --git a/src/prelude.rs b/src/prelude.rs
index 9c14a4b..9fe5992 100644
--- a/src/prelude.rs
+++ b/src/prelude.rs
@@ -1,5 +1,7 @@
pub use crate::env::Env;
+pub use std::io::{Read as _, Write as _};
+
pub use futures_util::future::FutureExt as _;
pub use futures_util::stream::StreamExt as _;
pub use futures_util::stream::TryStreamExt as _;
diff --git a/src/runner/builtins/command.rs b/src/runner/builtins/command.rs
index e0e1853..85f6594 100644
--- a/src/runner/builtins/command.rs
+++ b/src/runner/builtins/command.rs
@@ -38,8 +38,8 @@ impl Command {
self.cfg.io.set_stderr(fh);
}
- // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this
- // is just a wrapper)
+ // Safety: see pre_exec in tokio::process::Command (this is just a
+ // wrapper)
pub unsafe fn pre_exec<F>(&mut self, f: F)
where
F: 'static + FnMut() -> std::io::Result<()> + Send + Sync,
@@ -73,8 +73,8 @@ impl Cfg {
&self.io
}
- // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this
- // is just a wrapper)
+ // Safety: see pre_exec in tokio::process::Command (this is just a
+ // wrapper)
pub unsafe fn pre_exec<F>(&mut self, f: F)
where
F: 'static + FnMut() -> std::io::Result<()> + Send + Sync,
@@ -97,7 +97,7 @@ impl Cfg {
pub struct Io {
fds: std::collections::HashMap<
std::os::unix::io::RawFd,
- std::sync::Arc<tokio::sync::Mutex<File>>,
+ std::sync::Arc<File>,
>,
}
@@ -108,7 +108,7 @@ impl Io {
}
}
- fn stdin(&self) -> Option<std::sync::Arc<tokio::sync::Mutex<File>>> {
+ fn stdin(&self) -> Option<std::sync::Arc<File>> {
self.fds.get(&0).map(std::sync::Arc::clone)
}
@@ -120,13 +120,11 @@ impl Io {
0,
// Safety: we just acquired stdin via into_raw_fd, which acquires
// ownership of the fd, so we are now the sole owner
- std::sync::Arc::new(tokio::sync::Mutex::new(unsafe {
- File::input(stdin.into_raw_fd())
- })),
+ std::sync::Arc::new(unsafe { File::input(stdin.into_raw_fd()) }),
);
}
- fn stdout(&self) -> Option<std::sync::Arc<tokio::sync::Mutex<File>>> {
+ fn stdout(&self) -> Option<std::sync::Arc<File>> {
self.fds.get(&1).map(std::sync::Arc::clone)
}
@@ -138,13 +136,13 @@ impl Io {
1,
// Safety: we just acquired stdout via into_raw_fd, which acquires
// ownership of the fd, so we are now the sole owner
- std::sync::Arc::new(tokio::sync::Mutex::new(unsafe {
+ std::sync::Arc::new(unsafe {
File::output(stdout.into_raw_fd())
- })),
+ }),
);
}
- fn stderr(&self) -> Option<std::sync::Arc<tokio::sync::Mutex<File>>> {
+ fn stderr(&self) -> Option<std::sync::Arc<File>> {
self.fds.get(&2).map(std::sync::Arc::clone)
}
@@ -156,9 +154,9 @@ impl Io {
2,
// Safety: we just acquired stderr via into_raw_fd, which acquires
// ownership of the fd, so we are now the sole owner
- std::sync::Arc::new(tokio::sync::Mutex::new(unsafe {
+ std::sync::Arc::new(unsafe {
File::output(stderr.into_raw_fd())
- })),
+ }),
);
}
@@ -174,17 +172,13 @@ impl Io {
crate::parse::Direction::In => {
// Safety: we just opened fd, and nothing else has
// or can use it
- std::sync::Arc::new(tokio::sync::Mutex::new(
- unsafe { File::input(fd) },
- ))
+ std::sync::Arc::new(unsafe { File::input(fd) })
}
crate::parse::Direction::Out
| crate::parse::Direction::Append => {
// Safety: we just opened fd, and nothing else has
// or can use it
- std::sync::Arc::new(tokio::sync::Mutex::new(
- unsafe { File::output(fd) },
- ))
+ std::sync::Arc::new(unsafe { File::output(fd) })
}
}
}
@@ -193,10 +187,10 @@ impl Io {
}
}
- pub async fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> {
- let mut buf = vec![];
- if let Some(fh) = self.stdin() {
- if let File::In(fh) = &mut *fh.clone().lock_owned().await {
+ pub fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> {
+ let mut line = vec![];
+ if let Some(file) = self.stdin() {
+ if let File::In(fh) = &*file {
// we have to read only a single character at a time here
// because stdin needs to be shared across all commands in the
// command list, some of which may be builtins and others of
@@ -205,26 +199,27 @@ impl Io {
// no longer be available to the next command, since we have
// them buffered in memory rather than them being on the stdin
// pipe.
- while let Ok(byte) = fh.read_u8().await {
- buf.push(byte);
+ for byte in fh.bytes() {
+ let byte = byte?;
+ line.push(byte);
if byte == b'\n' {
break;
}
}
}
}
- let done = buf.is_empty();
- let mut buf = String::from_utf8(buf).unwrap();
- if buf.ends_with('\n') {
- buf.truncate(buf.len() - 1);
+ let done = line.is_empty();
+ let mut line = String::from_utf8(line).unwrap();
+ if line.ends_with('\n') {
+ line.truncate(line.len() - 1);
}
- Ok((buf, done))
+ Ok((line, done))
}
- pub async fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> {
- if let Some(fh) = self.stdout() {
- if let File::Out(fh) = &mut *fh.clone().lock_owned().await {
- Ok(fh.write_all(buf).await.map(|_| ())?)
+ pub fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> {
+ if let Some(file) = self.stdout() {
+ if let File::Out(fh) = &*file {
+ Ok((&*fh).write_all(buf)?)
} else {
Ok(())
}
@@ -233,10 +228,10 @@ impl Io {
}
}
- pub async fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> {
- if let Some(fh) = self.stderr() {
- if let File::Out(fh) = &mut *fh.clone().lock_owned().await {
- Ok(fh.write_all(buf).await.map(|_| ())?)
+ pub fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> {
+ if let Some(file) = self.stderr() {
+ if let File::Out(fh) = &*file {
+ Ok((&*fh).write_all(buf)?)
} else {
Ok(())
}
@@ -248,7 +243,7 @@ impl Io {
pub fn setup_command(mut self, cmd: &mut crate::runner::Command) {
if let Some(stdin) = self.fds.remove(&0) {
if let Ok(stdin) = std::sync::Arc::try_unwrap(stdin) {
- let stdin = stdin.into_inner().into_raw_fd();
+ let stdin = stdin.into_raw_fd();
if stdin != 0 {
// Safety: we just acquired stdin via into_raw_fd, which
// acquires ownership of the fd, so we are now the sole
@@ -260,7 +255,7 @@ impl Io {
}
if let Some(stdout) = self.fds.remove(&1) {
if let Ok(stdout) = std::sync::Arc::try_unwrap(stdout) {
- let stdout = stdout.into_inner().into_raw_fd();
+ let stdout = stdout.into_raw_fd();
if stdout != 1 {
// Safety: we just acquired stdout via into_raw_fd, which
// acquires ownership of the fd, so we are now the sole
@@ -272,7 +267,7 @@ impl Io {
}
if let Some(stderr) = self.fds.remove(&2) {
if let Ok(stderr) = std::sync::Arc::try_unwrap(stderr) {
- let stderr = stderr.into_inner().into_raw_fd();
+ let stderr = stderr.into_raw_fd();
if stderr != 2 {
// Safety: we just acquired stderr via into_raw_fd, which
// acquires ownership of the fd, so we are now the sole
@@ -295,24 +290,23 @@ impl Drop for Io {
#[derive(Debug)]
pub enum File {
- In(tokio::fs::File),
- Out(tokio::fs::File),
+ In(std::fs::File),
+ Out(std::fs::File),
}
impl File {
// Safety: fd must not be owned by any other File object
pub unsafe fn input(fd: std::os::unix::io::RawFd) -> Self {
- Self::In(tokio::fs::File::from_raw_fd(fd))
+ Self::In(std::fs::File::from_raw_fd(fd))
}
// Safety: fd must not be owned by any other File object
pub unsafe fn output(fd: std::os::unix::io::RawFd) -> Self {
- Self::Out(tokio::fs::File::from_raw_fd(fd))
+ Self::Out(std::fs::File::from_raw_fd(fd))
}
- fn maybe_drop(file: std::sync::Arc<tokio::sync::Mutex<Self>>) {
+ fn maybe_drop(file: std::sync::Arc<Self>) {
if let Ok(file) = std::sync::Arc::try_unwrap(file) {
- let file = file.into_inner();
if file.as_raw_fd() <= 2 {
let _ = file.into_raw_fd();
}
@@ -331,49 +325,33 @@ impl std::os::unix::io::AsRawFd for File {
impl std::os::unix::io::IntoRawFd for File {
fn into_raw_fd(self) -> std::os::unix::io::RawFd {
match self {
- Self::In(fh) | Self::Out(fh) => {
- // XXX
- fh.try_into_std().unwrap().into_raw_fd()
- }
+ Self::In(fh) | Self::Out(fh) => fh.into_raw_fd(),
}
}
}
-pub struct Child<'a> {
- fut: std::pin::Pin<
- Box<
- dyn std::future::Future<Output = std::process::ExitStatus>
- + Sync
- + Send
- + 'a,
- >,
- >,
- wrapped_child: Option<Box<crate::runner::Child<'a>>>,
+pub enum Child {
+ Task(tokio::task::JoinHandle<std::process::ExitStatus>),
+ Wrapped(Box<crate::runner::Child>),
}
-impl<'a> Child<'a> {
- pub fn new_fut<F>(fut: F) -> Self
+impl Child {
+ pub fn new_task<F>(f: F) -> Self
where
- F: std::future::Future<Output = std::process::ExitStatus>
- + Sync
- + Send
- + 'a,
+ F: FnOnce() -> std::process::ExitStatus + Send + 'static,
{
- Self {
- fut: Box::pin(fut),
- wrapped_child: None,
- }
+ Self::Task(tokio::task::spawn_blocking(f))
}
- pub fn new_wrapped(child: crate::runner::Child<'a>) -> Self {
- Self {
- fut: Box::pin(async move { unreachable!() }),
- wrapped_child: Some(Box::new(child)),
- }
+ pub fn new_wrapped(child: crate::runner::Child) -> Self {
+ Self::Wrapped(Box::new(child))
}
pub fn id(&self) -> Option<u32> {
- self.wrapped_child.as_ref().and_then(|cmd| cmd.id())
+ match self {
+ Self::Task(_) => None,
+ Self::Wrapped(child) => child.id(),
+ }
}
pub fn status(
@@ -383,15 +361,15 @@ impl<'a> Child<'a> {
dyn std::future::Future<
Output = anyhow::Result<std::process::ExitStatus>,
> + Send
- + Sync
- + 'a,
+ + Sync,
>,
> {
Box::pin(async move {
- if let Some(child) = self.wrapped_child {
- child.status().await
- } else {
- Ok(self.fut.await)
+ match self {
+ Self::Task(task) => {
+ task.await.map_err(|e| anyhow::anyhow!(e))
+ }
+ Self::Wrapped(child) => child.status().await,
}
})
}
diff --git a/src/runner/builtins/mod.rs b/src/runner/builtins/mod.rs
index 87b5ae7..f212496 100644
--- a/src/runner/builtins/mod.rs
+++ b/src/runner/builtins/mod.rs
@@ -7,7 +7,7 @@ type Builtin = &'static (dyn for<'a> Fn(
crate::parse::Exe,
&'a Env,
command::Cfg,
-) -> anyhow::Result<command::Child<'a>>
+) -> anyhow::Result<command::Child>
+ Sync
+ Send);
@@ -33,7 +33,6 @@ macro_rules! bail {
$cfg.io().write_stderr(
format!("{}: {}\n", $exe.exe().display(), $msg).as_bytes()
)
- .await
.unwrap();
return std::process::ExitStatus::from_raw(1 << 8);
};
@@ -41,12 +40,10 @@ macro_rules! bail {
$cfg.io().write_stderr(
format!("{}: ", $exe.exe().display()).as_bytes()
)
- .await
.unwrap();
$cfg.io().write_stderr(format!($msg, $($arg)*).as_bytes())
- .await
.unwrap();
- $cfg.io().write_stderr(b"\n").await.unwrap();
+ $cfg.io().write_stderr(b"\n").unwrap();
return std::process::ExitStatus::from_raw(1 << 8);
};
}
@@ -58,21 +55,19 @@ fn cd(
env: &Env,
cfg: command::Cfg,
) -> anyhow::Result<command::Child> {
- async fn async_cd(
- exe: crate::parse::Exe,
- env: &Env,
- cfg: command::Cfg,
- ) -> std::process::ExitStatus {
+ let prev_pwd = env.prev_pwd();
+ let home = env.var("HOME");
+ Ok(command::Child::new_task(move || {
let dir = if let Some(dir) = exe.args().get(0) {
if dir.is_empty() {
".".to_string().into()
} else if dir == "-" {
- env.prev_pwd()
+ prev_pwd
} else {
dir.into()
}
} else {
- let dir = env.var("HOME");
+ let dir = home;
if let Some(dir) = dir {
dir.into()
} else {
@@ -89,24 +84,16 @@ fn cd(
);
}
std::process::ExitStatus::from_raw(0)
- }
-
- Ok(command::Child::new_fut(async move {
- async_cd(exe, env, cfg).await
}))
}
#[allow(clippy::unnecessary_wraps)]
fn set(
exe: crate::parse::Exe,
- env: &Env,
+ _env: &Env,
cfg: command::Cfg,
) -> anyhow::Result<command::Child> {
- async fn async_set(
- exe: crate::parse::Exe,
- _env: &Env,
- cfg: command::Cfg,
- ) -> std::process::ExitStatus {
+ Ok(command::Child::new_task(move || {
let k = if let Some(k) = exe.args().get(0).map(String::as_str) {
k
} else {
@@ -120,24 +107,16 @@ fn set(
std::env::set_var(k, v);
std::process::ExitStatus::from_raw(0)
- }
-
- Ok(command::Child::new_fut(async move {
- async_set(exe, env, cfg).await
}))
}
#[allow(clippy::unnecessary_wraps)]
fn unset(
exe: crate::parse::Exe,
- env: &Env,
+ _env: &Env,
cfg: command::Cfg,
) -> anyhow::Result<command::Child> {
- async fn async_unset(
- exe: crate::parse::Exe,
- _env: &Env,
- cfg: command::Cfg,
- ) -> std::process::ExitStatus {
+ Ok(command::Child::new_task(move || {
let k = if let Some(k) = exe.args().get(0).map(String::as_str) {
k
} else {
@@ -146,10 +125,6 @@ fn unset(
std::env::remove_var(k);
std::process::ExitStatus::from_raw(0)
- }
-
- Ok(command::Child::new_fut(async move {
- async_unset(exe, env, cfg).await
}))
}
@@ -159,20 +134,15 @@ fn unset(
// this later, since the binary seems totally fine
fn echo(
exe: crate::parse::Exe,
- env: &Env,
+ _env: &Env,
cfg: command::Cfg,
) -> anyhow::Result<command::Child> {
- async fn async_echo(
- exe: crate::parse::Exe,
- _env: &Env,
- cfg: command::Cfg,
- ) -> std::process::ExitStatus {
+ Ok(command::Child::new_task(move || {
macro_rules! write_stdout {
($bytes:expr) => {
- if let Err(e) = cfg.io().write_stdout($bytes).await {
+ if let Err(e) = cfg.io().write_stdout($bytes) {
cfg.io()
.write_stderr(format!("echo: {}", e).as_bytes())
- .await
.unwrap();
return std::process::ExitStatus::from_raw(1 << 8);
}
@@ -189,31 +159,23 @@ fn echo(
}
std::process::ExitStatus::from_raw(0)
- }
-
- Ok(command::Child::new_fut(async move {
- async_echo(exe, env, cfg).await
}))
}
#[allow(clippy::unnecessary_wraps)]
fn read(
exe: crate::parse::Exe,
- env: &Env,
+ _env: &Env,
cfg: command::Cfg,
) -> anyhow::Result<command::Child> {
- async fn async_read(
- exe: crate::parse::Exe,
- _env: &Env,
- cfg: command::Cfg,
- ) -> std::process::ExitStatus {
+ Ok(command::Child::new_task(move || {
let var = if let Some(var) = exe.args().get(0).map(String::as_str) {
var
} else {
bail!(cfg, exe, "usage: read var");
};
- let (val, done) = match cfg.io().read_line_stdin().await {
+ let (val, done) = match cfg.io().read_line_stdin() {
Ok((line, done)) => (line, done),
Err(e) => {
bail!(cfg, exe, e);
@@ -222,10 +184,6 @@ fn read(
std::env::set_var(var, val);
std::process::ExitStatus::from_raw(if done { 1 << 8 } else { 0 })
- }
-
- Ok(command::Child::new_fut(async move {
- async_read(exe, env, cfg).await
}))
}
@@ -241,7 +199,7 @@ fn and(
Ok(command::Child::new_wrapped(cmd.spawn(env)?))
} else {
let status = env.latest_status();
- Ok(command::Child::new_fut(async move { status }))
+ Ok(command::Child::new_task(move || status))
}
}
@@ -253,7 +211,7 @@ fn or(
exe.shift();
if env.latest_status().success() {
let status = env.latest_status();
- Ok(command::Child::new_fut(async move { status }))
+ Ok(command::Child::new_task(move || status))
} else {
let mut cmd = crate::runner::Command::new(exe, cfg.io().clone());
cfg.setup_command(&mut cmd);
diff --git a/src/runner/command.rs b/src/runner/command.rs
index c7224e6..efbf166 100644
--- a/src/runner/command.rs
+++ b/src/runner/command.rs
@@ -150,12 +150,12 @@ pub enum Inner {
Builtin(super::builtins::Command),
}
-pub enum Child<'a> {
+pub enum Child {
Binary(tokio::process::Child),
- Builtin(super::builtins::Child<'a>),
+ Builtin(super::builtins::Child),
}
-impl<'a> Child<'a> {
+impl Child {
pub fn id(&self) -> Option<u32> {
match self {
Self::Binary(child) => child.id(),
@@ -170,8 +170,7 @@ impl<'a> Child<'a> {
dyn std::future::Future<
Output = anyhow::Result<std::process::ExitStatus>,
> + Send
- + Sync
- + 'a,
+ + Sync,
>,
> {
Box::pin(async move {
diff --git a/src/runner/mod.rs b/src/runner/mod.rs
index d06b332..acdb127 100644
--- a/src/runner/mod.rs
+++ b/src/runner/mod.rs
@@ -252,7 +252,7 @@ async fn run_pipeline(
let pipeline = pipeline.eval(env).await?;
let interactive = shell_write.is_some();
let (children, pg) = spawn_children(pipeline, env, &io, interactive)?;
- let status = wait_children(children, pg, env, &io, shell_write).await;
+ let status = wait_children(children, pg, env, shell_write).await;
if interactive {
sys::set_foreground_pg(nix::unistd::getpid())?;
}
@@ -275,12 +275,12 @@ async fn write_event(
Ok(())
}
-fn spawn_children<'a>(
+fn spawn_children(
pipeline: crate::parse::Pipeline,
- env: &'a Env,
+ env: &Env,
io: &builtins::Io,
interactive: bool,
-) -> anyhow::Result<(Vec<Child<'a>>, Option<nix::unistd::Pid>)> {
+) -> anyhow::Result<(Vec<Child>, Option<nix::unistd::Pid>)> {
let mut cmds: Vec<_> = pipeline
.into_exes()
.map(|exe| Command::new(exe, io.clone()))
@@ -318,10 +318,9 @@ fn spawn_children<'a>(
}
async fn wait_children(
- children: Vec<Child<'_>>,
+ children: Vec<Child>,
pg: Option<nix::unistd::Pid>,
env: &Env,
- io: &builtins::Io,
shell_write: &mut Option<tokio::fs::File>,
) -> std::process::ExitStatus {
enum Res {
@@ -331,11 +330,7 @@ async fn wait_children(
macro_rules! bail {
($e:expr) => {
- // if writing to stderr is not possible, we still want to exit
- // normally with a failure exit code
- #[allow(clippy::let_underscore_drop)]
- let _ =
- io.write_stderr(format!("nbsh: {}\n", $e).as_bytes()).await;
+ eprintln!("nbsh: {}\n", $e);
return std::process::ExitStatus::from_raw(1 << 8);
};
}