diff options
-rw-r--r-- | Cargo.lock | 36 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/main.rs | 9 | ||||
-rw-r--r-- | src/parse.rs | 17 | ||||
-rw-r--r-- | src/pipe.rs | 85 | ||||
-rw-r--r-- | src/state/history/mod.rs | 8 |
6 files changed, 146 insertions, 10 deletions
@@ -301,9 +301,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.17" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" +checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" [[package]] name = "futures-io" @@ -327,6 +327,37 @@ dependencies = [ ] [[package]] +name = "futures-macro" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-task" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" + +[[package]] +name = "futures-util" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] name = "generic-array" version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -463,6 +494,7 @@ dependencies = [ "anyhow", "async-std", "futures-lite", + "futures-util", "hostname", "libc", "nix", @@ -9,6 +9,7 @@ license = "MIT" anyhow = "1.0.51" async-std = { version = "1.10.0", features = ["unstable"] } futures-lite = "1.12.0" +futures-util = "0.3.19" hostname = "0.3.1" libc = "0.2.112" nix = "0.23.0" diff --git a/src/main.rs b/src/main.rs index b5b1f09..a4a0773 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,11 @@ fn get_offset() -> time::UtcOffset { } async fn async_main() -> anyhow::Result<()> { + if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { + pipe::run().await; + std::process::exit(0); + } + let mut input = textmode::Input::new().await?; let mut output = textmode::Output::new().await?; @@ -136,10 +141,6 @@ async fn async_main() -> anyhow::Result<()> { } fn main() { - if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { - pipe::run(); - std::process::exit(0); - } match async_std::task::block_on(async_main()) { Ok(_) => (), Err(e) => { diff --git a/src/parse.rs b/src/parse.rs index dc0ec0a..526b6ce 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -62,17 +62,33 @@ impl Exe { #[derive(Debug, Clone)] pub struct Pipeline { exes: Vec<Exe>, + input_string: String, } impl Pipeline { + pub fn parse(pipeline: &str) -> Result<Self, Error> { + Ok(Self::build_ast( + Shell::parse(Rule::pipeline, pipeline) + .map_err(|e| Error::new(pipeline, anyhow::anyhow!(e)))? + .next() + .unwrap(), + )) + } + pub fn exes(&self) -> &[Exe] { &self.exes } + pub fn input_string(&self) -> &str { + &self.input_string + } + fn build_ast(pipeline: pest::iterators::Pair<Rule>) -> Self { assert!(matches!(pipeline.as_rule(), Rule::pipeline)); + let input_string = pipeline.as_str().to_string(); Self { exes: pipeline.into_inner().map(Exe::build_ast).collect(), + input_string, } } } @@ -117,6 +133,7 @@ impl Commands { } } +#[derive(Debug)] pub struct Error { input: String, e: anyhow::Error, diff --git a/src/pipe.rs b/src/pipe.rs index 7d35961..3bba117 100644 --- a/src/pipe.rs +++ b/src/pipe.rs @@ -1,3 +1,84 @@ -pub fn run() { - todo!() +use async_std::io::prelude::ReadExt as _; +use async_std::os::unix::process::CommandExt as _; +use async_std::stream::StreamExt as _; +use std::os::unix::io::{AsRawFd as _, FromRawFd as _}; +use std::os::unix::process::ExitStatusExt as _; + +async fn read_pipeline() -> crate::parse::Pipeline { + let mut r = unsafe { async_std::fs::File::from_raw_fd(3) }; + let mut s = String::new(); + r.read_to_string(&mut s).await.unwrap(); + crate::parse::Pipeline::parse(&s).unwrap() +} + +pub async fn run() { + let pipeline = read_pipeline().await; + + let mut futures = futures_util::stream::FuturesUnordered::new(); + let mut pg = None; + let mut stdin = None; + let last = pipeline.exes().len() - 1; + for (i, exe) in pipeline.exes().iter().enumerate() { + let mut cmd = async_std::process::Command::new(exe.exe()); + cmd.args(exe.args()); + if let Some(stdin) = stdin { + cmd.stdin(unsafe { + async_std::process::Stdio::from_raw_fd(stdin) + }); + } + if i < last { + let (r, w) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); + stdin = Some(r); + cmd.stdout(unsafe { async_std::process::Stdio::from_raw_fd(w) }); + } + let pg_pid = nix::unistd::Pid::from_raw(pg.unwrap_or(0)); + unsafe { + cmd.pre_exec(move || { + nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), pg_pid)?; + Ok(()) + }); + } + let child = cmd.spawn().unwrap(); + let res = nix::unistd::setpgid( + nix::unistd::Pid::from_raw(child.id().try_into().unwrap()), + pg_pid, + ); + match res { + Ok(()) => {} + Err(e) => { + if e != nix::errno::Errno::EACCES { + res.unwrap(); + } + } + } + if pg.is_none() { + pg = Some(child.id().try_into().unwrap()); + } + futures.push(async move { + (child.status_no_drop().await.unwrap(), i == last) + }); + } + + let pty = std::fs::File::open("/dev/tty").unwrap(); + nix::unistd::tcsetpgrp( + pty.as_raw_fd(), + nix::unistd::Pid::from_raw(pg.unwrap()), + ) + .unwrap(); + + let mut final_status = None; + while let Some((status, last)) = futures.next().await { + if status.signal() == Some(signal_hook::consts::signal::SIGINT) { + nix::sys::signal::raise(nix::sys::signal::SIGINT).unwrap(); + } + if last { + final_status = Some(status); + } + } + if let Some(code) = final_status.unwrap().code() { + std::process::exit(code); + } else { + std::process::exit(255); + } } diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs index ede9120..5a38bc0 100644 --- a/src/state/history/mod.rs +++ b/src/state/history/mod.rs @@ -647,9 +647,13 @@ async fn run_pipeline( let child = cmd.spawn(&env.pty).unwrap(); nix::unistd::close(r).unwrap(); - let w = unsafe { std::fs::File::from_raw_fd(w) }; + let mut w = unsafe { async_std::fs::File::from_raw_fd(w) }; + let pipeline = pipeline.clone(); let fut = async move { - // TODO write data to w + w.write_all(pipeline.input_string().as_bytes()) + .await + .unwrap(); + drop(w); child.status_no_drop().await.unwrap() }; run_future(fut, env.clone()).await |