From 43600665dc50abae7f9d90d171cd14f51ba92448 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 8 Jan 2022 06:54:39 -0500 Subject: refactor builtin fd handling a bit --- src/pipeline/builtins/command.rs | 178 +++++++++++++++++++++++++-------------- 1 file changed, 114 insertions(+), 64 deletions(-) (limited to 'src/pipeline/builtins/command.rs') diff --git a/src/pipeline/builtins/command.rs b/src/pipeline/builtins/command.rs index 03fe8a2..fcf0bb8 100644 --- a/src/pipeline/builtins/command.rs +++ b/src/pipeline/builtins/command.rs @@ -57,7 +57,7 @@ impl Command { pub struct Io { fds: std::collections::HashMap< std::os::unix::io::RawFd, - std::os::unix::io::RawFd, + crate::mutex::Mutex, >, pre_exec: Option< Box std::io::Result<()> + Send + Sync>, @@ -67,67 +67,64 @@ pub struct Io { impl Io { fn new() -> Self { let mut fds = std::collections::HashMap::new(); - fds.insert(0.as_raw_fd(), 0.as_raw_fd()); - fds.insert(1.as_raw_fd(), 1.as_raw_fd()); - fds.insert(2.as_raw_fd(), 2.as_raw_fd()); + fds.insert(0, crate::mutex::new(unsafe { File::input(0) })); + fds.insert(1, crate::mutex::new(unsafe { File::output(1) })); + fds.insert(2, crate::mutex::new(unsafe { File::output(2) })); Self { fds, pre_exec: None, } } - fn stdin(&self) -> Option { - self.fds - .get(&0.as_raw_fd()) - .copied() - // Safety: TODO this is likely unsafe - .map(|fd| unsafe { async_std::fs::File::from_raw_fd(fd) }) + fn stdin(&self) -> Option> { + self.fds.get(&0).map(async_std::sync::Arc::clone) } fn set_stdin(&mut self, stdin: T) { - if let Some(fd) = self.fds.get(&0.as_raw_fd()) { - if *fd > 2 { - // Safety: TODO this is likely unsafe - drop(unsafe { async_std::fs::File::from_raw_fd(*fd) }); + if let Some(file) = self.fds.remove(&0) { + let file = crate::mutex::unwrap(file); + if file.as_raw_fd() <= 2 { + let _ = file.into_raw_fd(); } } - self.fds.insert(0.as_raw_fd(), stdin.into_raw_fd()); + self.fds.insert( + 0, + crate::mutex::new(unsafe { File::input(stdin.into_raw_fd()) }), + ); } - fn stdout(&self) -> Option { - self.fds - .get(&1.as_raw_fd()) - .copied() - // Safety: TODO this is likely unsafe - .map(|fd| unsafe { async_std::fs::File::from_raw_fd(fd) }) + fn stdout(&self) -> Option> { + self.fds.get(&1).map(async_std::sync::Arc::clone) } fn set_stdout(&mut self, stdout: T) { - if let Some(fd) = self.fds.get(&1.as_raw_fd()) { - if *fd > 2 { - // Safety: TODO this is likely unsafe - drop(unsafe { async_std::fs::File::from_raw_fd(*fd) }); + if let Some(file) = self.fds.remove(&1) { + let file = crate::mutex::unwrap(file); + if file.as_raw_fd() <= 2 { + let _ = file.into_raw_fd(); } } - self.fds.insert(1.as_raw_fd(), stdout.into_raw_fd()); + self.fds.insert( + 1, + crate::mutex::new(unsafe { File::output(stdout.into_raw_fd()) }), + ); } - fn stderr(&self) -> Option { - self.fds - .get(&2.as_raw_fd()) - .copied() - // Safety: TODO this is likely unsafe - .map(|fd| unsafe { async_std::fs::File::from_raw_fd(fd) }) + fn stderr(&self) -> Option> { + self.fds.get(&2).map(async_std::sync::Arc::clone) } fn set_stderr(&mut self, stderr: T) { - if let Some(fd) = self.fds.get(&2.as_raw_fd()) { - if *fd > 2 { - // Safety: TODO this is likely unsafe - drop(unsafe { async_std::fs::File::from_raw_fd(*fd) }); + if let Some(file) = self.fds.remove(&2) { + let file = crate::mutex::unwrap(file); + if file.as_raw_fd() <= 2 { + let _ = file.into_raw_fd(); } } - self.fds.insert(2.as_raw_fd(), stderr.into_raw_fd()); + self.fds.insert( + 2, + crate::mutex::new(unsafe { File::output(stderr.into_raw_fd()) }), + ); } // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this @@ -142,9 +139,20 @@ impl Io { pub fn apply_redirects(&mut self, redirects: &[crate::parse::Redirect]) { for redirect in redirects { let to = match &redirect.to { - crate::parse::RedirectTarget::Fd(fd) => self.fds[fd], + crate::parse::RedirectTarget::Fd(fd) => { + async_std::sync::Arc::clone(&self.fds[fd]) + } crate::parse::RedirectTarget::File(path) => { - redirect.dir.open(path).unwrap() + let fd = redirect.dir.open(path).unwrap(); + match redirect.dir { + crate::parse::Direction::In => { + crate::mutex::new(unsafe { File::input(fd) }) + } + crate::parse::Direction::Out + | crate::parse::Direction::Append => { + crate::mutex::new(unsafe { File::output(fd) }) + } + } } }; self.fds.insert(redirect.from, to); @@ -152,58 +160,64 @@ impl Io { } pub async fn read_stdin(&self, buf: &mut [u8]) -> anyhow::Result { - if let Some(mut fh) = self.stdin() { - let res = fh.read(buf).await; - let _ = fh.into_raw_fd(); - Ok(res?) + if let Some(fh) = self.stdin() { + if let File::In(fh) = &mut *fh.lock_arc().await { + Ok(fh.read(buf).await?) + } else { + Ok(0) + } } else { Ok(0) } } pub async fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> { - if let Some(mut fh) = self.stdout() { - let res = fh.write_all(buf).await; - let _ = fh.into_raw_fd(); - Ok(res.map(|_| ())?) + if let Some(fh) = self.stdout() { + if let File::Out(fh) = &mut *fh.lock_arc().await { + Ok(fh.write_all(buf).await.map(|_| ())?) + } else { + Ok(()) + } } else { Ok(()) } } pub async fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> { - if let Some(mut fh) = self.stderr() { - let res = fh.write_all(buf).await; - let _ = fh.into_raw_fd(); - Ok(res.map(|_| ())?) + if let Some(fh) = self.stderr() { + if let File::Out(fh) = &mut *fh.lock_arc().await { + Ok(fh.write_all(buf).await.map(|_| ())?) + } else { + Ok(()) + } } else { Ok(()) } } pub fn setup_command(mut self, cmd: &mut crate::pipeline::Command) { - if let Some(stdin) = self.stdin() { - let stdin = stdin.into_raw_fd(); + if let Some(stdin) = self.fds.remove(&0) { + let stdin = crate::mutex::unwrap(stdin).into_raw_fd(); if stdin != 0 { // Safety: TODO this is likely unsafe cmd.stdin(unsafe { std::fs::File::from_raw_fd(stdin) }); - self.fds.remove(&0.as_raw_fd()); + self.fds.remove(&0); } } - if let Some(stdout) = self.stdout() { - let stdout = stdout.into_raw_fd(); + if let Some(stdout) = self.fds.remove(&1) { + let stdout = crate::mutex::unwrap(stdout).into_raw_fd(); if stdout != 1 { // Safety: TODO this is likely unsafe cmd.stdout(unsafe { std::fs::File::from_raw_fd(stdout) }); - self.fds.remove(&1.as_raw_fd()); + self.fds.remove(&1); } } - if let Some(stderr) = self.stderr() { - let stderr = stderr.into_raw_fd(); + if let Some(stderr) = self.fds.remove(&2) { + let stderr = crate::mutex::unwrap(stderr).into_raw_fd(); if stderr != 2 { // Safety: TODO this is likely unsafe cmd.stderr(unsafe { std::fs::File::from_raw_fd(stderr) }); - self.fds.remove(&2.as_raw_fd()); + self.fds.remove(&2); } } if let Some(pre_exec) = self.pre_exec.take() { @@ -217,15 +231,51 @@ impl Io { impl Drop for Io { fn drop(&mut self) { - for fd in self.fds.values() { - if *fd > 2 { - // Safety: TODO this is likely unsafe - drop(unsafe { std::fs::File::from_raw_fd(*fd) }); + for (_, file) in self.fds.drain() { + let file = crate::mutex::unwrap(file); + if file.as_raw_fd() <= 2 { + let _ = file.into_raw_fd(); } } } } +#[derive(Debug)] +pub enum File { + In(async_std::io::BufReader), + Out(async_std::fs::File), +} + +impl File { + unsafe fn input(fd: std::os::unix::io::RawFd) -> Self { + Self::In(async_std::io::BufReader::new( + async_std::fs::File::from_raw_fd(fd), + )) + } + + unsafe fn output(fd: std::os::unix::io::RawFd) -> Self { + Self::Out(async_std::fs::File::from_raw_fd(fd)) + } +} + +impl std::os::unix::io::AsRawFd for File { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + match self { + Self::In(fh) => fh.get_ref().as_raw_fd(), + Self::Out(fh) => fh.as_raw_fd(), + } + } +} + +impl std::os::unix::io::IntoRawFd for File { + fn into_raw_fd(self) -> std::os::unix::io::RawFd { + match self { + Self::In(fh) => fh.into_inner().into_raw_fd(), + Self::Out(fh) => fh.into_raw_fd(), + } + } +} + pub struct Child<'a> { fut: std::pin::Pin< Box< -- cgit v1.2.3-54-g00ecf