summaryrefslogtreecommitdiffstats
path: root/src/runner/builtins/command.rs
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-02-26 15:44:13 -0500
committerJesse Luehrs <doy@tozt.net>2022-02-26 15:44:13 -0500
commit07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7 (patch)
tree2e6e6628cb7bbef52e8eeb66a90e69562d2ad09a /src/runner/builtins/command.rs
parent6015f1000e09e72d1355105921294e2c37cf1fc2 (diff)
downloadnbsh-07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7.tar.gz
nbsh-07ed5629fff12ccb95216f7b07c91bb6e8bbfbc7.zip
remove the mutex for builtin fds
Diffstat (limited to 'src/runner/builtins/command.rs')
-rw-r--r--src/runner/builtins/command.rs148
1 files changed, 63 insertions, 85 deletions
diff --git a/src/runner/builtins/command.rs b/src/runner/builtins/command.rs
index e0e1853..85f6594 100644
--- a/src/runner/builtins/command.rs
+++ b/src/runner/builtins/command.rs
@@ -38,8 +38,8 @@ impl Command {
self.cfg.io.set_stderr(fh);
}
- // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this
- // is just a wrapper)
+ // Safety: see pre_exec in tokio::process::Command (this is just a
+ // wrapper)
pub unsafe fn pre_exec<F>(&mut self, f: F)
where
F: 'static + FnMut() -> std::io::Result<()> + Send + Sync,
@@ -73,8 +73,8 @@ impl Cfg {
&self.io
}
- // Safety: see pre_exec in async_std::os::unix::process::CommandExt (this
- // is just a wrapper)
+ // Safety: see pre_exec in tokio::process::Command (this is just a
+ // wrapper)
pub unsafe fn pre_exec<F>(&mut self, f: F)
where
F: 'static + FnMut() -> std::io::Result<()> + Send + Sync,
@@ -97,7 +97,7 @@ impl Cfg {
pub struct Io {
fds: std::collections::HashMap<
std::os::unix::io::RawFd,
- std::sync::Arc<tokio::sync::Mutex<File>>,
+ std::sync::Arc<File>,
>,
}
@@ -108,7 +108,7 @@ impl Io {
}
}
- fn stdin(&self) -> Option<std::sync::Arc<tokio::sync::Mutex<File>>> {
+ fn stdin(&self) -> Option<std::sync::Arc<File>> {
self.fds.get(&0).map(std::sync::Arc::clone)
}
@@ -120,13 +120,11 @@ impl Io {
0,
// Safety: we just acquired stdin via into_raw_fd, which acquires
// ownership of the fd, so we are now the sole owner
- std::sync::Arc::new(tokio::sync::Mutex::new(unsafe {
- File::input(stdin.into_raw_fd())
- })),
+ std::sync::Arc::new(unsafe { File::input(stdin.into_raw_fd()) }),
);
}
- fn stdout(&self) -> Option<std::sync::Arc<tokio::sync::Mutex<File>>> {
+ fn stdout(&self) -> Option<std::sync::Arc<File>> {
self.fds.get(&1).map(std::sync::Arc::clone)
}
@@ -138,13 +136,13 @@ impl Io {
1,
// Safety: we just acquired stdout via into_raw_fd, which acquires
// ownership of the fd, so we are now the sole owner
- std::sync::Arc::new(tokio::sync::Mutex::new(unsafe {
+ std::sync::Arc::new(unsafe {
File::output(stdout.into_raw_fd())
- })),
+ }),
);
}
- fn stderr(&self) -> Option<std::sync::Arc<tokio::sync::Mutex<File>>> {
+ fn stderr(&self) -> Option<std::sync::Arc<File>> {
self.fds.get(&2).map(std::sync::Arc::clone)
}
@@ -156,9 +154,9 @@ impl Io {
2,
// Safety: we just acquired stderr via into_raw_fd, which acquires
// ownership of the fd, so we are now the sole owner
- std::sync::Arc::new(tokio::sync::Mutex::new(unsafe {
+ std::sync::Arc::new(unsafe {
File::output(stderr.into_raw_fd())
- })),
+ }),
);
}
@@ -174,17 +172,13 @@ impl Io {
crate::parse::Direction::In => {
// Safety: we just opened fd, and nothing else has
// or can use it
- std::sync::Arc::new(tokio::sync::Mutex::new(
- unsafe { File::input(fd) },
- ))
+ std::sync::Arc::new(unsafe { File::input(fd) })
}
crate::parse::Direction::Out
| crate::parse::Direction::Append => {
// Safety: we just opened fd, and nothing else has
// or can use it
- std::sync::Arc::new(tokio::sync::Mutex::new(
- unsafe { File::output(fd) },
- ))
+ std::sync::Arc::new(unsafe { File::output(fd) })
}
}
}
@@ -193,10 +187,10 @@ impl Io {
}
}
- pub async fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> {
- let mut buf = vec![];
- if let Some(fh) = self.stdin() {
- if let File::In(fh) = &mut *fh.clone().lock_owned().await {
+ pub fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> {
+ let mut line = vec![];
+ if let Some(file) = self.stdin() {
+ if let File::In(fh) = &*file {
// we have to read only a single character at a time here
// because stdin needs to be shared across all commands in the
// command list, some of which may be builtins and others of
@@ -205,26 +199,27 @@ impl Io {
// no longer be available to the next command, since we have
// them buffered in memory rather than them being on the stdin
// pipe.
- while let Ok(byte) = fh.read_u8().await {
- buf.push(byte);
+ for byte in fh.bytes() {
+ let byte = byte?;
+ line.push(byte);
if byte == b'\n' {
break;
}
}
}
}
- let done = buf.is_empty();
- let mut buf = String::from_utf8(buf).unwrap();
- if buf.ends_with('\n') {
- buf.truncate(buf.len() - 1);
+ let done = line.is_empty();
+ let mut line = String::from_utf8(line).unwrap();
+ if line.ends_with('\n') {
+ line.truncate(line.len() - 1);
}
- Ok((buf, done))
+ Ok((line, done))
}
- pub async fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> {
- if let Some(fh) = self.stdout() {
- if let File::Out(fh) = &mut *fh.clone().lock_owned().await {
- Ok(fh.write_all(buf).await.map(|_| ())?)
+ pub fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> {
+ if let Some(file) = self.stdout() {
+ if let File::Out(fh) = &*file {
+ Ok((&*fh).write_all(buf)?)
} else {
Ok(())
}
@@ -233,10 +228,10 @@ impl Io {
}
}
- pub async fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> {
- if let Some(fh) = self.stderr() {
- if let File::Out(fh) = &mut *fh.clone().lock_owned().await {
- Ok(fh.write_all(buf).await.map(|_| ())?)
+ pub fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> {
+ if let Some(file) = self.stderr() {
+ if let File::Out(fh) = &*file {
+ Ok((&*fh).write_all(buf)?)
} else {
Ok(())
}
@@ -248,7 +243,7 @@ impl Io {
pub fn setup_command(mut self, cmd: &mut crate::runner::Command) {
if let Some(stdin) = self.fds.remove(&0) {
if let Ok(stdin) = std::sync::Arc::try_unwrap(stdin) {
- let stdin = stdin.into_inner().into_raw_fd();
+ let stdin = stdin.into_raw_fd();
if stdin != 0 {
// Safety: we just acquired stdin via into_raw_fd, which
// acquires ownership of the fd, so we are now the sole
@@ -260,7 +255,7 @@ impl Io {
}
if let Some(stdout) = self.fds.remove(&1) {
if let Ok(stdout) = std::sync::Arc::try_unwrap(stdout) {
- let stdout = stdout.into_inner().into_raw_fd();
+ let stdout = stdout.into_raw_fd();
if stdout != 1 {
// Safety: we just acquired stdout via into_raw_fd, which
// acquires ownership of the fd, so we are now the sole
@@ -272,7 +267,7 @@ impl Io {
}
if let Some(stderr) = self.fds.remove(&2) {
if let Ok(stderr) = std::sync::Arc::try_unwrap(stderr) {
- let stderr = stderr.into_inner().into_raw_fd();
+ let stderr = stderr.into_raw_fd();
if stderr != 2 {
// Safety: we just acquired stderr via into_raw_fd, which
// acquires ownership of the fd, so we are now the sole
@@ -295,24 +290,23 @@ impl Drop for Io {
#[derive(Debug)]
pub enum File {
- In(tokio::fs::File),
- Out(tokio::fs::File),
+ In(std::fs::File),
+ Out(std::fs::File),
}
impl File {
// Safety: fd must not be owned by any other File object
pub unsafe fn input(fd: std::os::unix::io::RawFd) -> Self {
- Self::In(tokio::fs::File::from_raw_fd(fd))
+ Self::In(std::fs::File::from_raw_fd(fd))
}
// Safety: fd must not be owned by any other File object
pub unsafe fn output(fd: std::os::unix::io::RawFd) -> Self {
- Self::Out(tokio::fs::File::from_raw_fd(fd))
+ Self::Out(std::fs::File::from_raw_fd(fd))
}
- fn maybe_drop(file: std::sync::Arc<tokio::sync::Mutex<Self>>) {
+ fn maybe_drop(file: std::sync::Arc<Self>) {
if let Ok(file) = std::sync::Arc::try_unwrap(file) {
- let file = file.into_inner();
if file.as_raw_fd() <= 2 {
let _ = file.into_raw_fd();
}
@@ -331,49 +325,33 @@ impl std::os::unix::io::AsRawFd for File {
impl std::os::unix::io::IntoRawFd for File {
fn into_raw_fd(self) -> std::os::unix::io::RawFd {
match self {
- Self::In(fh) | Self::Out(fh) => {
- // XXX
- fh.try_into_std().unwrap().into_raw_fd()
- }
+ Self::In(fh) | Self::Out(fh) => fh.into_raw_fd(),
}
}
}
-pub struct Child<'a> {
- fut: std::pin::Pin<
- Box<
- dyn std::future::Future<Output = std::process::ExitStatus>
- + Sync
- + Send
- + 'a,
- >,
- >,
- wrapped_child: Option<Box<crate::runner::Child<'a>>>,
+pub enum Child {
+ Task(tokio::task::JoinHandle<std::process::ExitStatus>),
+ Wrapped(Box<crate::runner::Child>),
}
-impl<'a> Child<'a> {
- pub fn new_fut<F>(fut: F) -> Self
+impl Child {
+ pub fn new_task<F>(f: F) -> Self
where
- F: std::future::Future<Output = std::process::ExitStatus>
- + Sync
- + Send
- + 'a,
+ F: FnOnce() -> std::process::ExitStatus + Send + 'static,
{
- Self {
- fut: Box::pin(fut),
- wrapped_child: None,
- }
+ Self::Task(tokio::task::spawn_blocking(f))
}
- pub fn new_wrapped(child: crate::runner::Child<'a>) -> Self {
- Self {
- fut: Box::pin(async move { unreachable!() }),
- wrapped_child: Some(Box::new(child)),
- }
+ pub fn new_wrapped(child: crate::runner::Child) -> Self {
+ Self::Wrapped(Box::new(child))
}
pub fn id(&self) -> Option<u32> {
- self.wrapped_child.as_ref().and_then(|cmd| cmd.id())
+ match self {
+ Self::Task(_) => None,
+ Self::Wrapped(child) => child.id(),
+ }
}
pub fn status(
@@ -383,15 +361,15 @@ impl<'a> Child<'a> {
dyn std::future::Future<
Output = anyhow::Result<std::process::ExitStatus>,
> + Send
- + Sync
- + 'a,
+ + Sync,
>,
> {
Box::pin(async move {
- if let Some(child) = self.wrapped_child {
- child.status().await
- } else {
- Ok(self.fut.await)
+ match self {
+ Self::Task(task) => {
+ task.await.map_err(|e| anyhow::anyhow!(e))
+ }
+ Self::Wrapped(child) => child.status().await,
}
})
}