From 07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Sat, 26 Feb 2022 15:44:13 -0500 Subject: remove the mutex for builtin fds --- src/runner/builtins/command.rs | 148 ++++++++++++++++++----------------------- 1 file changed, 63 insertions(+), 85 deletions(-) (limited to 'src/runner/builtins/command.rs') diff --git a/src/runner/builtins/command.rs b/src/runner/builtins/command.rs index e0e1853..85f6594 100644 --- a/src/runner/builtins/command.rs +++ b/src/runner/builtins/command.rs @@ -38,8 +38,8 @@ impl Command { self.cfg.io.set_stderr(fh); } - // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this - // is just a wrapper) + // Safety: see pre_exec in tokio::process::Command (this is just a + // wrapper) pub unsafe fn pre_exec(&mut self, f: F) where F: 'static + FnMut() -> std::io::Result<()> + Send + Sync, @@ -73,8 +73,8 @@ impl Cfg { &self.io } - // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this - // is just a wrapper) + // Safety: see pre_exec in tokio::process::Command (this is just a + // wrapper) pub unsafe fn pre_exec(&mut self, f: F) where F: 'static + FnMut() -> std::io::Result<()> + Send + Sync, @@ -97,7 +97,7 @@ impl Cfg { pub struct Io { fds: std::collections::HashMap< std::os::unix::io::RawFd, - std::sync::Arc>, + std::sync::Arc, >, } @@ -108,7 +108,7 @@ impl Io { } } - fn stdin(&self) -> Option>> { + fn stdin(&self) -> Option> { self.fds.get(&0).map(std::sync::Arc::clone) } @@ -120,13 +120,11 @@ impl Io { 0, // Safety: we just acquired stdin via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { - File::input(stdin.into_raw_fd()) - })), + std::sync::Arc::new(unsafe { File::input(stdin.into_raw_fd()) }), ); } - fn stdout(&self) -> Option>> { + fn stdout(&self) -> Option> { self.fds.get(&1).map(std::sync::Arc::clone) } @@ -138,13 +136,13 @@ impl Io { 1, // Safety: we just acquired stdout via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { + std::sync::Arc::new(unsafe { File::output(stdout.into_raw_fd()) - })), + }), ); } - fn stderr(&self) -> Option>> { + fn stderr(&self) -> Option> { self.fds.get(&2).map(std::sync::Arc::clone) } @@ -156,9 +154,9 @@ impl Io { 2, // Safety: we just acquired stderr via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { + std::sync::Arc::new(unsafe { File::output(stderr.into_raw_fd()) - })), + }), ); } @@ -174,17 +172,13 @@ impl Io { crate::parse::Direction::In => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(tokio::sync::Mutex::new( - unsafe { File::input(fd) }, - )) + std::sync::Arc::new(unsafe { File::input(fd) }) } crate::parse::Direction::Out | crate::parse::Direction::Append => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(tokio::sync::Mutex::new( - unsafe { File::output(fd) }, - )) + std::sync::Arc::new(unsafe { File::output(fd) }) } } } @@ -193,10 +187,10 @@ impl Io { } } - pub async fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> { - let mut buf = vec![]; - if let Some(fh) = self.stdin() { - if let File::In(fh) = &mut *fh.clone().lock_owned().await { + pub fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> { + let mut line = vec![]; + if let Some(file) = self.stdin() { + if let File::In(fh) = &*file { // we have to read only a single character at a time here // because stdin needs to be shared across all commands in the // command list, some of which may be builtins and others of @@ -205,26 +199,27 @@ impl Io { // no longer be available to the next command, since we have // them buffered in memory rather than them being on the stdin // pipe. - while let Ok(byte) = fh.read_u8().await { - buf.push(byte); + for byte in fh.bytes() { + let byte = byte?; + line.push(byte); if byte == b'\n' { break; } } } } - let done = buf.is_empty(); - let mut buf = String::from_utf8(buf).unwrap(); - if buf.ends_with('\n') { - buf.truncate(buf.len() - 1); + let done = line.is_empty(); + let mut line = String::from_utf8(line).unwrap(); + if line.ends_with('\n') { + line.truncate(line.len() - 1); } - Ok((buf, done)) + Ok((line, done)) } - pub async fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> { - if let Some(fh) = self.stdout() { - if let File::Out(fh) = &mut *fh.clone().lock_owned().await { - Ok(fh.write_all(buf).await.map(|_| ())?) + pub fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> { + if let Some(file) = self.stdout() { + if let File::Out(fh) = &*file { + Ok((&*fh).write_all(buf)?) } else { Ok(()) } @@ -233,10 +228,10 @@ impl Io { } } - pub async fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> { - if let Some(fh) = self.stderr() { - if let File::Out(fh) = &mut *fh.clone().lock_owned().await { - Ok(fh.write_all(buf).await.map(|_| ())?) + pub fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> { + if let Some(file) = self.stderr() { + if let File::Out(fh) = &*file { + Ok((&*fh).write_all(buf)?) } else { Ok(()) } @@ -248,7 +243,7 @@ impl Io { pub fn setup_command(mut self, cmd: &mut crate::runner::Command) { if let Some(stdin) = self.fds.remove(&0) { if let Ok(stdin) = std::sync::Arc::try_unwrap(stdin) { - let stdin = stdin.into_inner().into_raw_fd(); + let stdin = stdin.into_raw_fd(); if stdin != 0 { // Safety: we just acquired stdin via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -260,7 +255,7 @@ impl Io { } if let Some(stdout) = self.fds.remove(&1) { if let Ok(stdout) = std::sync::Arc::try_unwrap(stdout) { - let stdout = stdout.into_inner().into_raw_fd(); + let stdout = stdout.into_raw_fd(); if stdout != 1 { // Safety: we just acquired stdout via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -272,7 +267,7 @@ impl Io { } if let Some(stderr) = self.fds.remove(&2) { if let Ok(stderr) = std::sync::Arc::try_unwrap(stderr) { - let stderr = stderr.into_inner().into_raw_fd(); + let stderr = stderr.into_raw_fd(); if stderr != 2 { // Safety: we just acquired stderr via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -295,24 +290,23 @@ impl Drop for Io { #[derive(Debug)] pub enum File { - In(tokio::fs::File), - Out(tokio::fs::File), + In(std::fs::File), + Out(std::fs::File), } impl File { // Safety: fd must not be owned by any other File object pub unsafe fn input(fd: std::os::unix::io::RawFd) -> Self { - Self::In(tokio::fs::File::from_raw_fd(fd)) + Self::In(std::fs::File::from_raw_fd(fd)) } // Safety: fd must not be owned by any other File object pub unsafe fn output(fd: std::os::unix::io::RawFd) -> Self { - Self::Out(tokio::fs::File::from_raw_fd(fd)) + Self::Out(std::fs::File::from_raw_fd(fd)) } - fn maybe_drop(file: std::sync::Arc>) { + fn maybe_drop(file: std::sync::Arc) { if let Ok(file) = std::sync::Arc::try_unwrap(file) { - let file = file.into_inner(); if file.as_raw_fd() <= 2 { let _ = file.into_raw_fd(); } @@ -331,49 +325,33 @@ impl std::os::unix::io::AsRawFd for File { impl std::os::unix::io::IntoRawFd for File { fn into_raw_fd(self) -> std::os::unix::io::RawFd { match self { - Self::In(fh) | Self::Out(fh) => { - // XXX - fh.try_into_std().unwrap().into_raw_fd() - } + Self::In(fh) | Self::Out(fh) => fh.into_raw_fd(), } } } -pub struct Child<'a> { - fut: std::pin::Pin< - Box< - dyn std::future::Future - + Sync - + Send - + 'a, - >, - >, - wrapped_child: Option>>, +pub enum Child { + Task(tokio::task::JoinHandle), + Wrapped(Box), } -impl<'a> Child<'a> { - pub fn new_fut(fut: F) -> Self +impl Child { + pub fn new_task(f: F) -> Self where - F: std::future::Future - + Sync - + Send - + 'a, + F: FnOnce() -> std::process::ExitStatus + Send + 'static, { - Self { - fut: Box::pin(fut), - wrapped_child: None, - } + Self::Task(tokio::task::spawn_blocking(f)) } - pub fn new_wrapped(child: crate::runner::Child<'a>) -> Self { - Self { - fut: Box::pin(async move { unreachable!() }), - wrapped_child: Some(Box::new(child)), - } + pub fn new_wrapped(child: crate::runner::Child) -> Self { + Self::Wrapped(Box::new(child)) } pub fn id(&self) -> Option { - self.wrapped_child.as_ref().and_then(|cmd| cmd.id()) + match self { + Self::Task(_) => None, + Self::Wrapped(child) => child.id(), + } } pub fn status( @@ -383,15 +361,15 @@ impl<'a> Child<'a> { dyn std::future::Future< Output = anyhow::Result, > + Send - + Sync - + 'a, + + Sync, >, > { Box::pin(async move { - if let Some(child) = self.wrapped_child { - child.status().await - } else { - Ok(self.fut.await) + match self { + Self::Task(task) => { + task.await.map_err(|e| anyhow::anyhow!(e)) + } + Self::Wrapped(child) => child.status().await, } }) } -- cgit v1.2.3-54-g00ecf