diff options
author | Jesse Luehrs <doy@tozt.net> | 2021-12-31 03:36:33 -0500 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2021-12-31 03:36:33 -0500 |
commit | 1c2486a55c21b323f73c72c0128def0fcac061eb (patch) | |
tree | d290d26aa6dc6462aee08f8620159980b43f80c3 /src | |
parent | d82cd70272ea300fcfd98110f674580b835ffab2 (diff) | |
download | nbsh-1c2486a55c21b323f73c72c0128def0fcac061eb.tar.gz nbsh-1c2486a55c21b323f73c72c0128def0fcac061eb.zip |
basic implementation of pipes
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 9 | ||||
-rw-r--r-- | src/parse.rs | 17 | ||||
-rw-r--r-- | src/pipe.rs | 85 | ||||
-rw-r--r-- | src/state/history/mod.rs | 8 |
4 files changed, 111 insertions, 8 deletions
diff --git a/src/main.rs b/src/main.rs index b5b1f09..a4a0773 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,11 @@ fn get_offset() -> time::UtcOffset { } async fn async_main() -> anyhow::Result<()> { + if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { + pipe::run().await; + std::process::exit(0); + } + let mut input = textmode::Input::new().await?; let mut output = textmode::Output::new().await?; @@ -136,10 +141,6 @@ async fn async_main() -> anyhow::Result<()> { } fn main() { - if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") { - pipe::run(); - std::process::exit(0); - } match async_std::task::block_on(async_main()) { Ok(_) => (), Err(e) => { diff --git a/src/parse.rs b/src/parse.rs index dc0ec0a..526b6ce 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -62,17 +62,33 @@ impl Exe { #[derive(Debug, Clone)] pub struct Pipeline { exes: Vec<Exe>, + input_string: String, } impl Pipeline { + pub fn parse(pipeline: &str) -> Result<Self, Error> { + Ok(Self::build_ast( + Shell::parse(Rule::pipeline, pipeline) + .map_err(|e| Error::new(pipeline, anyhow::anyhow!(e)))? + .next() + .unwrap(), + )) + } + pub fn exes(&self) -> &[Exe] { &self.exes } + pub fn input_string(&self) -> &str { + &self.input_string + } + fn build_ast(pipeline: pest::iterators::Pair<Rule>) -> Self { assert!(matches!(pipeline.as_rule(), Rule::pipeline)); + let input_string = pipeline.as_str().to_string(); Self { exes: pipeline.into_inner().map(Exe::build_ast).collect(), + input_string, } } } @@ -117,6 +133,7 @@ impl Commands { } } +#[derive(Debug)] pub struct Error { input: String, e: anyhow::Error, diff --git a/src/pipe.rs b/src/pipe.rs index 7d35961..3bba117 100644 --- a/src/pipe.rs +++ b/src/pipe.rs @@ -1,3 +1,84 @@ -pub fn run() { - todo!() +use async_std::io::prelude::ReadExt as _; +use async_std::os::unix::process::CommandExt as _; +use async_std::stream::StreamExt as _; +use std::os::unix::io::{AsRawFd as _, FromRawFd as _}; +use std::os::unix::process::ExitStatusExt as _; + +async fn read_pipeline() -> crate::parse::Pipeline { + let mut r = unsafe { async_std::fs::File::from_raw_fd(3) }; + let mut s = String::new(); + r.read_to_string(&mut s).await.unwrap(); + crate::parse::Pipeline::parse(&s).unwrap() +} + +pub async fn run() { + let pipeline = read_pipeline().await; + + let mut futures = futures_util::stream::FuturesUnordered::new(); + let mut pg = None; + let mut stdin = None; + let last = pipeline.exes().len() - 1; + for (i, exe) in pipeline.exes().iter().enumerate() { + let mut cmd = async_std::process::Command::new(exe.exe()); + cmd.args(exe.args()); + if let Some(stdin) = stdin { + cmd.stdin(unsafe { + async_std::process::Stdio::from_raw_fd(stdin) + }); + } + if i < last { + let (r, w) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); + stdin = Some(r); + cmd.stdout(unsafe { async_std::process::Stdio::from_raw_fd(w) }); + } + let pg_pid = nix::unistd::Pid::from_raw(pg.unwrap_or(0)); + unsafe { + cmd.pre_exec(move || { + nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), pg_pid)?; + Ok(()) + }); + } + let child = cmd.spawn().unwrap(); + let res = nix::unistd::setpgid( + nix::unistd::Pid::from_raw(child.id().try_into().unwrap()), + pg_pid, + ); + match res { + Ok(()) => {} + Err(e) => { + if e != nix::errno::Errno::EACCES { + res.unwrap(); + } + } + } + if pg.is_none() { + pg = Some(child.id().try_into().unwrap()); + } + futures.push(async move { + (child.status_no_drop().await.unwrap(), i == last) + }); + } + + let pty = std::fs::File::open("/dev/tty").unwrap(); + nix::unistd::tcsetpgrp( + pty.as_raw_fd(), + nix::unistd::Pid::from_raw(pg.unwrap()), + ) + .unwrap(); + + let mut final_status = None; + while let Some((status, last)) = futures.next().await { + if status.signal() == Some(signal_hook::consts::signal::SIGINT) { + nix::sys::signal::raise(nix::sys::signal::SIGINT).unwrap(); + } + if last { + final_status = Some(status); + } + } + if let Some(code) = final_status.unwrap().code() { + std::process::exit(code); + } else { + std::process::exit(255); + } } diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs index ede9120..5a38bc0 100644 --- a/src/state/history/mod.rs +++ b/src/state/history/mod.rs @@ -647,9 +647,13 @@ async fn run_pipeline( let child = cmd.spawn(&env.pty).unwrap(); nix::unistd::close(r).unwrap(); - let w = unsafe { std::fs::File::from_raw_fd(w) }; + let mut w = unsafe { async_std::fs::File::from_raw_fd(w) }; + let pipeline = pipeline.clone(); let fut = async move { - // TODO write data to w + w.write_all(pipeline.input_string().as_bytes()) + .await + .unwrap(); + drop(w); child.status_no_drop().await.unwrap() }; run_future(fut, env.clone()).await |