summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-01-08 07:55:16 -0500
committerJesse Luehrs <doy@tozt.net>2022-01-08 07:55:16 -0500
commit778b1f751a036fc025026be883e336074bde7d9b (patch)
treea2e301ed4b005dbd530053d2eb8f23fef7b4f556
parent22769358621eae3d1e65cff3d402f71b6c1cec4b (diff)
downloadnbsh-778b1f751a036fc025026be883e336074bde7d9b.tar.gz
nbsh-778b1f751a036fc025026be883e336074bde7d9b.zip
share stdin/stdout/stderr handles among builtins in a pipeline
-rw-r--r--src/mutex.rs10
-rw-r--r--src/pipeline/builtins/command.rs108
-rw-r--r--src/pipeline/builtins/mod.rs2
-rw-r--r--src/pipeline/command.rs18
-rw-r--r--src/pipeline/mod.rs29
5 files changed, 112 insertions, 55 deletions
diff --git a/src/mutex.rs b/src/mutex.rs
index 51192a8..aca5669 100644
--- a/src/mutex.rs
+++ b/src/mutex.rs
@@ -4,8 +4,10 @@ pub fn new<T>(t: T) -> async_std::sync::Arc<async_std::sync::Mutex<T>> {
async_std::sync::Arc::new(async_std::sync::Mutex::new(t))
}
-pub fn unwrap<T: std::fmt::Debug>(t: Mutex<T>) -> T {
- async_std::sync::Mutex::into_inner(
- async_std::sync::Arc::try_unwrap(t).unwrap(),
- )
+pub fn unwrap<T: std::fmt::Debug>(t: Mutex<T>) -> Option<T> {
+ if let Ok(mutex) = async_std::sync::Arc::try_unwrap(t) {
+ Some(async_std::sync::Mutex::into_inner(mutex))
+ } else {
+ None
+ }
}
diff --git a/src/pipeline/builtins/command.rs b/src/pipeline/builtins/command.rs
index 83d39cb..8119a55 100644
--- a/src/pipeline/builtins/command.rs
+++ b/src/pipeline/builtins/command.rs
@@ -23,6 +23,25 @@ impl Command {
}
}
+ pub fn new_with_io(
+ exe: crate::parse::Exe,
+ io: Io,
+ ) -> Result<Self, crate::parse::Exe> {
+ if let Some(s) = exe.exe().to_str() {
+ if let Some(f) = super::BUILTINS.get(s) {
+ Ok(Self {
+ exe,
+ f,
+ cfg: Cfg::new_with_io(io),
+ })
+ } else {
+ Err(exe)
+ }
+ } else {
+ Err(exe)
+ }
+ }
+
pub fn stdin(&mut self, fh: std::fs::File) {
self.cfg.io.set_stdin(fh);
}
@@ -69,6 +88,10 @@ impl Cfg {
}
}
+ fn new_with_io(io: Io) -> Self {
+ Self { io, pre_exec: None }
+ }
+
pub fn io(&self) -> &Io {
&self.io
}
@@ -93,6 +116,7 @@ impl Cfg {
}
}
+#[derive(Debug, Clone)]
pub struct Io {
fds: std::collections::HashMap<
std::os::unix::io::RawFd,
@@ -101,24 +125,19 @@ pub struct Io {
}
impl Io {
- fn new() -> Self {
- let mut fds = std::collections::HashMap::new();
- fds.insert(0, crate::mutex::new(unsafe { File::input(0) }));
- fds.insert(1, crate::mutex::new(unsafe { File::output(1) }));
- fds.insert(2, crate::mutex::new(unsafe { File::output(2) }));
- Self { fds }
+ pub fn new() -> Self {
+ Self {
+ fds: std::collections::HashMap::new(),
+ }
}
fn stdin(&self) -> Option<crate::mutex::Mutex<File>> {
self.fds.get(&0).map(async_std::sync::Arc::clone)
}
- fn set_stdin<T: std::os::unix::io::IntoRawFd>(&mut self, stdin: T) {
+ pub fn set_stdin<T: std::os::unix::io::IntoRawFd>(&mut self, stdin: T) {
if let Some(file) = self.fds.remove(&0) {
- let file = crate::mutex::unwrap(file);
- if file.as_raw_fd() <= 2 {
- let _ = file.into_raw_fd();
- }
+ File::maybe_drop(file);
}
self.fds.insert(
0,
@@ -130,12 +149,9 @@ impl Io {
self.fds.get(&1).map(async_std::sync::Arc::clone)
}
- fn set_stdout<T: std::os::unix::io::IntoRawFd>(&mut self, stdout: T) {
+ pub fn set_stdout<T: std::os::unix::io::IntoRawFd>(&mut self, stdout: T) {
if let Some(file) = self.fds.remove(&1) {
- let file = crate::mutex::unwrap(file);
- if file.as_raw_fd() <= 2 {
- let _ = file.into_raw_fd();
- }
+ File::maybe_drop(file);
}
self.fds.insert(
1,
@@ -147,12 +163,9 @@ impl Io {
self.fds.get(&2).map(async_std::sync::Arc::clone)
}
- fn set_stderr<T: std::os::unix::io::IntoRawFd>(&mut self, stderr: T) {
+ pub fn set_stderr<T: std::os::unix::io::IntoRawFd>(&mut self, stderr: T) {
if let Some(file) = self.fds.remove(&2) {
- let file = crate::mutex::unwrap(file);
- if file.as_raw_fd() <= 2 {
- let _ = file.into_raw_fd();
- }
+ File::maybe_drop(file);
}
self.fds.insert(
2,
@@ -221,27 +234,33 @@ impl Io {
pub fn setup_command(mut self, cmd: &mut crate::pipeline::Command) {
if let Some(stdin) = self.fds.remove(&0) {
- let stdin = crate::mutex::unwrap(stdin).into_raw_fd();
- if stdin != 0 {
- // Safety: TODO this is likely unsafe
- cmd.stdin(unsafe { std::fs::File::from_raw_fd(stdin) });
- self.fds.remove(&0);
+ if let Some(stdin) = crate::mutex::unwrap(stdin) {
+ let stdin = stdin.into_raw_fd();
+ if stdin != 0 {
+ // Safety: TODO this is likely unsafe
+ cmd.stdin(unsafe { std::fs::File::from_raw_fd(stdin) });
+ self.fds.remove(&0);
+ }
}
}
if let Some(stdout) = self.fds.remove(&1) {
- let stdout = crate::mutex::unwrap(stdout).into_raw_fd();
- if stdout != 1 {
- // Safety: TODO this is likely unsafe
- cmd.stdout(unsafe { std::fs::File::from_raw_fd(stdout) });
- self.fds.remove(&1);
+ if let Some(stdout) = crate::mutex::unwrap(stdout) {
+ let stdout = stdout.into_raw_fd();
+ if stdout != 1 {
+ // Safety: TODO this is likely unsafe
+ cmd.stdout(unsafe { std::fs::File::from_raw_fd(stdout) });
+ self.fds.remove(&1);
+ }
}
}
if let Some(stderr) = self.fds.remove(&2) {
- let stderr = crate::mutex::unwrap(stderr).into_raw_fd();
- if stderr != 2 {
- // Safety: TODO this is likely unsafe
- cmd.stderr(unsafe { std::fs::File::from_raw_fd(stderr) });
- self.fds.remove(&2);
+ if let Some(stderr) = crate::mutex::unwrap(stderr) {
+ let stderr = stderr.into_raw_fd();
+ if stderr != 2 {
+ // Safety: TODO this is likely unsafe
+ cmd.stderr(unsafe { std::fs::File::from_raw_fd(stderr) });
+ self.fds.remove(&2);
+ }
}
}
}
@@ -250,10 +269,7 @@ impl Io {
impl Drop for Io {
fn drop(&mut self) {
for (_, file) in self.fds.drain() {
- let file = crate::mutex::unwrap(file);
- if file.as_raw_fd() <= 2 {
- let _ = file.into_raw_fd();
- }
+ File::maybe_drop(file);
}
}
}
@@ -265,15 +281,23 @@ pub enum File {
}
impl File {
- unsafe fn input(fd: std::os::unix::io::RawFd) -> Self {
+ pub unsafe fn input(fd: std::os::unix::io::RawFd) -> Self {
Self::In(async_std::io::BufReader::new(
async_std::fs::File::from_raw_fd(fd),
))
}
- unsafe fn output(fd: std::os::unix::io::RawFd) -> Self {
+ pub unsafe fn output(fd: std::os::unix::io::RawFd) -> Self {
Self::Out(async_std::fs::File::from_raw_fd(fd))
}
+
+ fn maybe_drop(file: crate::mutex::Mutex<Self>) {
+ if let Some(file) = crate::mutex::unwrap(file) {
+ if file.as_raw_fd() <= 2 {
+ let _ = file.into_raw_fd();
+ }
+ }
+ }
}
impl std::os::unix::io::AsRawFd for File {
diff --git a/src/pipeline/builtins/mod.rs b/src/pipeline/builtins/mod.rs
index 5c30575..9670c0f 100644
--- a/src/pipeline/builtins/mod.rs
+++ b/src/pipeline/builtins/mod.rs
@@ -1,7 +1,7 @@
use crate::pipeline::prelude::*;
pub mod command;
-pub use command::{Child, Command};
+pub use command::{Child, Command, File, Io};
type Builtin = &'static (dyn for<'a> Fn(
crate::parse::Exe,
diff --git a/src/pipeline/command.rs b/src/pipeline/command.rs
index f9dda06..70fbe9f 100644
--- a/src/pipeline/command.rs
+++ b/src/pipeline/command.rs
@@ -23,6 +23,24 @@ impl Command {
}
}
+ pub fn new_with_io(
+ exe: crate::parse::Exe,
+ io: super::builtins::Io,
+ ) -> Self {
+ let exe_path = exe.exe().to_path_buf();
+ let redirects = exe.redirects().to_vec();
+ Self {
+ inner: super::builtins::Command::new_with_io(exe, io)
+ .map_or_else(
+ |exe| Self::new_binary(exe).inner,
+ Inner::Builtin,
+ ),
+ exe: exe_path,
+ redirects,
+ pre_exec: None,
+ }
+ }
+
#[allow(clippy::needless_pass_by_value)]
pub fn new_binary(exe: crate::parse::Exe) -> Self {
let exe_path = exe.exe().to_path_buf();
diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs
index a54951c..5c6567e 100644
--- a/src/pipeline/mod.rs
+++ b/src/pipeline/mod.rs
@@ -14,14 +14,22 @@ pub use command::{Child, Command};
mod prelude;
pub async fn run() -> anyhow::Result<i32> {
- cloexec(3)?;
- cloexec(4)?;
+ let stdin = unsafe { async_std::fs::File::from_raw_fd(0) };
+ let stdout = unsafe { async_std::fs::File::from_raw_fd(1) };
+ let stderr = unsafe { async_std::fs::File::from_raw_fd(2) };
// Safety: we don't create File instances for fd 3 or 4 anywhere else
let shell_read = unsafe { async_std::fs::File::from_raw_fd(3) };
let shell_write = unsafe { async_std::fs::File::from_raw_fd(4) };
+ cloexec(3)?;
+ cloexec(4)?;
+
+ let mut io = builtins::Io::new();
+ io.set_stdin(stdin);
+ io.set_stdout(stdout);
+ io.set_stderr(stderr);
let mut env = read_data(shell_read).await?;
- run_with_env(&mut env, &shell_write).await?;
+ run_with_env(&mut env, &io, &shell_write).await?;
let status = *env.latest_status();
env.update()?;
@@ -35,11 +43,12 @@ pub async fn run() -> anyhow::Result<i32> {
async fn run_with_env(
env: &mut Env,
+ io: &builtins::Io,
shell_write: &async_std::fs::File,
) -> anyhow::Result<()> {
let pipeline =
crate::parse::ast::Pipeline::parse(env.pipeline().unwrap())?;
- let (children, pg) = spawn_children(pipeline, env)?;
+ let (children, pg) = spawn_children(pipeline, env, io)?;
let status = wait_children(children, pg, env, shell_write).await;
env.set_status(status);
Ok(())
@@ -61,12 +70,16 @@ async fn write_event(
Ok(())
}
-fn spawn_children(
+fn spawn_children<'a>(
pipeline: crate::parse::ast::Pipeline,
- env: &Env,
-) -> anyhow::Result<(Vec<Child>, Option<nix::unistd::Pid>)> {
+ env: &'a Env,
+ io: &builtins::Io,
+) -> anyhow::Result<(Vec<Child<'a>>, Option<nix::unistd::Pid>)> {
let pipeline = pipeline.eval(env);
- let mut cmds: Vec<_> = pipeline.into_exes().map(Command::new).collect();
+ let mut cmds: Vec<_> = pipeline
+ .into_exes()
+ .map(|exe| Command::new_with_io(exe, io.clone()))
+ .collect();
for i in 0..(cmds.len() - 1) {
let (r, w) = pipe()?;
cmds[i].stdout(w);