From a2462bbaea13f7a3f3eb65e7430b30618bc203b8 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 25 Feb 2022 17:32:58 -0500 Subject: move to tokio --- src/runner/builtins/command.rs | 66 +++++++++++++++------------ src/runner/builtins/mod.rs | 16 +++---- src/runner/command.rs | 10 ++-- src/runner/mod.rs | 101 ++++++++++++++++++++--------------------- 4 files changed, 97 insertions(+), 96 deletions(-) (limited to 'src/runner') diff --git a/src/runner/builtins/command.rs b/src/runner/builtins/command.rs index c0d3a84..e0e1853 100644 --- a/src/runner/builtins/command.rs +++ b/src/runner/builtins/command.rs @@ -97,7 +97,7 @@ impl Cfg { pub struct Io { fds: std::collections::HashMap< std::os::unix::io::RawFd, - std::sync::Arc, + std::sync::Arc>, >, } @@ -108,7 +108,7 @@ impl Io { } } - fn stdin(&self) -> Option> { + fn stdin(&self) -> Option>> { self.fds.get(&0).map(std::sync::Arc::clone) } @@ -120,11 +120,13 @@ impl Io { 0, // Safety: we just acquired stdin via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(unsafe { File::input(stdin.into_raw_fd()) }), + std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { + File::input(stdin.into_raw_fd()) + })), ); } - fn stdout(&self) -> Option> { + fn stdout(&self) -> Option>> { self.fds.get(&1).map(std::sync::Arc::clone) } @@ -136,13 +138,13 @@ impl Io { 1, // Safety: we just acquired stdout via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(unsafe { + std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { File::output(stdout.into_raw_fd()) - }), + })), ); } - fn stderr(&self) -> Option> { + fn stderr(&self) -> Option>> { self.fds.get(&2).map(std::sync::Arc::clone) } @@ -154,9 +156,9 @@ impl Io { 2, // Safety: we just acquired stderr via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(unsafe { + std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { File::output(stderr.into_raw_fd()) - }), + })), ); } @@ -172,13 +174,17 @@ impl Io { crate::parse::Direction::In => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(unsafe { File::input(fd) }) + std::sync::Arc::new(tokio::sync::Mutex::new( + unsafe { File::input(fd) }, + )) } crate::parse::Direction::Out | crate::parse::Direction::Append => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(unsafe { File::output(fd) }) + std::sync::Arc::new(tokio::sync::Mutex::new( + unsafe { File::output(fd) }, + )) } } } @@ -190,7 +196,7 @@ impl Io { pub async fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> { let mut buf = vec![]; if let Some(fh) = self.stdin() { - if let File::In(fh) = &*fh { + if let File::In(fh) = &mut *fh.clone().lock_owned().await { // we have to read only a single character at a time here // because stdin needs to be shared across all commands in the // command list, some of which may be builtins and others of @@ -199,9 +205,7 @@ impl Io { // no longer be available to the next command, since we have // them buffered in memory rather than them being on the stdin // pipe. - let mut bytes = fh.bytes(); - while let Some(byte) = bytes.next().await { - let byte = byte?; + while let Ok(byte) = fh.read_u8().await { buf.push(byte); if byte == b'\n' { break; @@ -219,8 +223,8 @@ impl Io { pub async fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> { if let Some(fh) = self.stdout() { - if let File::Out(fh) = &*fh { - Ok((&*fh).write_all(buf).await.map(|_| ())?) + if let File::Out(fh) = &mut *fh.clone().lock_owned().await { + Ok(fh.write_all(buf).await.map(|_| ())?) } else { Ok(()) } @@ -231,8 +235,8 @@ impl Io { pub async fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> { if let Some(fh) = self.stderr() { - if let File::Out(fh) = &*fh { - Ok((&*fh).write_all(buf).await.map(|_| ())?) + if let File::Out(fh) = &mut *fh.clone().lock_owned().await { + Ok(fh.write_all(buf).await.map(|_| ())?) } else { Ok(()) } @@ -244,7 +248,7 @@ impl Io { pub fn setup_command(mut self, cmd: &mut crate::runner::Command) { if let Some(stdin) = self.fds.remove(&0) { if let Ok(stdin) = std::sync::Arc::try_unwrap(stdin) { - let stdin = stdin.into_raw_fd(); + let stdin = stdin.into_inner().into_raw_fd(); if stdin != 0 { // Safety: we just acquired stdin via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -256,7 +260,7 @@ impl Io { } if let Some(stdout) = self.fds.remove(&1) { if let Ok(stdout) = std::sync::Arc::try_unwrap(stdout) { - let stdout = stdout.into_raw_fd(); + let stdout = stdout.into_inner().into_raw_fd(); if stdout != 1 { // Safety: we just acquired stdout via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -268,7 +272,7 @@ impl Io { } if let Some(stderr) = self.fds.remove(&2) { if let Ok(stderr) = std::sync::Arc::try_unwrap(stderr) { - let stderr = stderr.into_raw_fd(); + let stderr = stderr.into_inner().into_raw_fd(); if stderr != 2 { // Safety: we just acquired stderr via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -291,23 +295,24 @@ impl Drop for Io { #[derive(Debug)] pub enum File { - In(async_std::fs::File), - Out(async_std::fs::File), + In(tokio::fs::File), + Out(tokio::fs::File), } impl File { // Safety: fd must not be owned by any other File object pub unsafe fn input(fd: std::os::unix::io::RawFd) -> Self { - Self::In(async_std::fs::File::from_raw_fd(fd)) + Self::In(tokio::fs::File::from_raw_fd(fd)) } // Safety: fd must not be owned by any other File object pub unsafe fn output(fd: std::os::unix::io::RawFd) -> Self { - Self::Out(async_std::fs::File::from_raw_fd(fd)) + Self::Out(tokio::fs::File::from_raw_fd(fd)) } - fn maybe_drop(file: std::sync::Arc) { + fn maybe_drop(file: std::sync::Arc>) { if let Ok(file) = std::sync::Arc::try_unwrap(file) { + let file = file.into_inner(); if file.as_raw_fd() <= 2 { let _ = file.into_raw_fd(); } @@ -326,7 +331,10 @@ impl std::os::unix::io::AsRawFd for File { impl std::os::unix::io::IntoRawFd for File { fn into_raw_fd(self) -> std::os::unix::io::RawFd { match self { - Self::In(fh) | Self::Out(fh) => fh.into_raw_fd(), + Self::In(fh) | Self::Out(fh) => { + // XXX + fh.try_into_std().unwrap().into_raw_fd() + } } } } @@ -373,7 +381,7 @@ impl<'a> Child<'a> { ) -> std::pin::Pin< Box< dyn std::future::Future< - Output = anyhow::Result, + Output = anyhow::Result, > + Send + Sync + 'a, diff --git a/src/runner/builtins/mod.rs b/src/runner/builtins/mod.rs index 5205856..87b5ae7 100644 --- a/src/runner/builtins/mod.rs +++ b/src/runner/builtins/mod.rs @@ -88,7 +88,7 @@ fn cd( dir.display() ); } - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -119,7 +119,7 @@ fn set( }; std::env::set_var(k, v); - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -145,7 +145,7 @@ fn unset( }; std::env::remove_var(k); - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -174,7 +174,7 @@ fn echo( .write_stderr(format!("echo: {}", e).as_bytes()) .await .unwrap(); - return async_std::process::ExitStatus::from_raw(1 << 8); + return std::process::ExitStatus::from_raw(1 << 8); } }; } @@ -188,7 +188,7 @@ fn echo( } } - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -221,11 +221,7 @@ fn read( }; std::env::set_var(var, val); - async_std::process::ExitStatus::from_raw(if done { - 1 << 8 - } else { - 0 - }) + std::process::ExitStatus::from_raw(if done { 1 << 8 } else { 0 }) } Ok(command::Child::new_fut(async move { diff --git a/src/runner/command.rs b/src/runner/command.rs index 5d4c11e..c7224e6 100644 --- a/src/runner/command.rs +++ b/src/runner/command.rs @@ -27,7 +27,7 @@ impl Command { pub fn new_binary(exe: crate::parse::Exe) -> Self { let exe_path = exe.exe().to_path_buf(); let redirects = exe.redirects().to_vec(); - let mut cmd = async_std::process::Command::new(exe.exe()); + let mut cmd = tokio::process::Command::new(exe.exe()); cmd.args(exe.args()); Self { inner: Inner::Binary(cmd), @@ -146,19 +146,19 @@ impl Command { } pub enum Inner { - Binary(async_std::process::Command), + Binary(tokio::process::Command), Builtin(super::builtins::Command), } pub enum Child<'a> { - Binary(async_std::process::Child), + Binary(tokio::process::Child), Builtin(super::builtins::Child<'a>), } impl<'a> Child<'a> { pub fn id(&self) -> Option { match self { - Self::Binary(child) => Some(child.id()), + Self::Binary(child) => child.id(), Self::Builtin(child) => child.id(), } } @@ -176,7 +176,7 @@ impl<'a> Child<'a> { > { Box::pin(async move { match self { - Self::Binary(child) => Ok(child.status_no_drop().await?), + Self::Binary(mut child) => Ok(child.wait().await?), Self::Builtin(child) => Ok(child.status().await?), } }) diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 1a5003f..d06b332 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -70,7 +70,7 @@ enum Frame { pub async fn run( commands: &str, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> anyhow::Result { let mut env = Env::new_from_env()?; run_commands(commands, &mut env, shell_write).await?; @@ -86,7 +86,7 @@ pub async fn run( async fn run_commands( commands: &str, env: &mut Env, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> anyhow::Result<()> { let commands = crate::parse::ast::Commands::parse(commands)?; let commands = commands.commands(); @@ -152,7 +152,7 @@ async fn run_commands( .map(IntoIterator::into_iter) }) .collect::>() - .collect::, _>>().await? + .try_collect::>().await? .into_iter() .flatten() .collect() @@ -231,7 +231,7 @@ async fn run_commands( async fn run_pipeline( pipeline: crate::parse::ast::Pipeline, env: &mut Env, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> anyhow::Result<()> { write_event(shell_write, Event::RunPipeline(env.idx(), pipeline.span())) .await?; @@ -240,9 +240,9 @@ async fn run_pipeline( // level would not be safe, because in the case of a command line like // "echo foo; ls", we would pass the stdout fd to the ls process while it // is still open here, and may still have data buffered. - let stdin = unsafe { async_std::fs::File::from_raw_fd(0) }; - let stdout = unsafe { async_std::fs::File::from_raw_fd(1) }; - let stderr = unsafe { async_std::fs::File::from_raw_fd(2) }; + let stdin = unsafe { std::fs::File::from_raw_fd(0) }; + let stdout = unsafe { std::fs::File::from_raw_fd(1) }; + let stderr = unsafe { std::fs::File::from_raw_fd(2) }; let mut io = builtins::Io::new(); io.set_stdin(stdin); io.set_stdout(stdout); @@ -265,10 +265,10 @@ async fn run_pipeline( } async fn write_event( - fh: Option<&async_std::fs::File>, + fh: &mut Option, event: Event, ) -> anyhow::Result<()> { - if let Some(mut fh) = fh { + if let Some(fh) = fh { fh.write_all(&bincode::serialize(&event)?).await?; fh.flush().await?; } @@ -322,11 +322,11 @@ async fn wait_children( pg: Option, env: &Env, io: &builtins::Io, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> std::process::ExitStatus { enum Res { Child(nix::Result), - Builtin(Option<(anyhow::Result, bool)>), + Builtin((anyhow::Result, bool)), } macro_rules! bail { @@ -353,7 +353,8 @@ async fn wait_children( (sys::id_to_pid(child.id().unwrap()), (child, i == count - 1)) }) .collect(); - let mut builtins: futures_util::stream::FuturesUnordered<_> = + let mut builtin_count = builtins.len(); + let builtins: futures_util::stream::FuturesUnordered<_> = builtins .into_iter() .map(|(i, child)| async move { @@ -361,47 +362,40 @@ async fn wait_children( }) .collect(); - let (wait_w, wait_r) = async_std::channel::unbounded(); - let new_wait = move || { - if let Some(pg) = pg { - let wait_w = wait_w.clone(); - async_std::task::spawn(async move { - let res = blocking::unblock(move || { - nix::sys::wait::waitpid( - sys::neg_pid(pg), - Some(nix::sys::wait::WaitPidFlag::WUNTRACED), - ) - }) - .await; - if wait_w.is_closed() { - // we shouldn't be able to drop real process terminations + let (wait_w, wait_r) = tokio::sync::mpsc::unbounded_channel(); + if let Some(pg) = pg { + tokio::task::spawn_blocking(move || loop { + let res = nix::sys::wait::waitpid( + sys::neg_pid(pg), + Some(nix::sys::wait::WaitPidFlag::WUNTRACED), + ); + match wait_w.send(res) { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(res)) => { + // we should never drop wait_r while there are still valid + // things to read assert!(res.is_err()); - } else { - wait_w.send(res).await.unwrap(); + break; } - }); - } - }; - - new_wait(); - loop { - if children.is_empty() && builtins.is_empty() { - break; - } + } + }); + } - let child = async { Res::Child(wait_r.recv().await.unwrap()) }; - let builtin = async { - Res::Builtin(if builtins.is_empty() { - std::future::pending().await - } else { - builtins.next().await - }) - }; - match child.race(builtin).await { + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(wait_r) + .map(Res::Child) + .boxed(), + builtins.map(Res::Builtin).boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { Res::Child(Ok(status)) => { match status { - // we can't call child.status() here to unify these branches - // because our waitpid call already collected the status + // we can't call child.status() here to unify these + // branches because our waitpid call already collected the + // status nix::sys::wait::WaitStatus::Exited(pid, code) => { let (_, last) = children.remove(&pid).unwrap(); if last { @@ -449,12 +443,11 @@ async fn wait_children( } _ => {} } - new_wait(); } Res::Child(Err(e)) => { bail!(e); } - Res::Builtin(Some((Ok(status), last))) => { + Res::Builtin((Ok(status), last)) => { // this conversion is safe because the Signal enum is // repr(i32) #[allow(clippy::as_conversions)] @@ -470,11 +463,15 @@ async fn wait_children( if last { final_status = Some(status); } + builtin_count -= 1; } - Res::Builtin(Some((Err(e), _))) => { + Res::Builtin((Err(e), _)) => { bail!(e); } - Res::Builtin(None) => {} + } + + if children.is_empty() && builtin_count == 0 { + break; } } -- cgit v1.2.3-54-g00ecf