summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2021-12-31 03:36:33 -0500
committerJesse Luehrs <doy@tozt.net>2021-12-31 03:36:33 -0500
commit1c2486a55c21b323f73c72c0128def0fcac061eb (patch)
treed290d26aa6dc6462aee08f8620159980b43f80c3 /src
parentd82cd70272ea300fcfd98110f674580b835ffab2 (diff)
downloadnbsh-1c2486a55c21b323f73c72c0128def0fcac061eb.tar.gz
nbsh-1c2486a55c21b323f73c72c0128def0fcac061eb.zip
basic implementation of pipes
Diffstat (limited to 'src')
-rw-r--r--src/main.rs9
-rw-r--r--src/parse.rs17
-rw-r--r--src/pipe.rs85
-rw-r--r--src/state/history/mod.rs8
4 files changed, 111 insertions, 8 deletions
diff --git a/src/main.rs b/src/main.rs
index b5b1f09..a4a0773 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -43,6 +43,11 @@ fn get_offset() -> time::UtcOffset {
}
async fn async_main() -> anyhow::Result<()> {
+ if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") {
+ pipe::run().await;
+ std::process::exit(0);
+ }
+
let mut input = textmode::Input::new().await?;
let mut output = textmode::Output::new().await?;
@@ -136,10 +141,6 @@ async fn async_main() -> anyhow::Result<()> {
}
fn main() {
- if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") {
- pipe::run();
- std::process::exit(0);
- }
match async_std::task::block_on(async_main()) {
Ok(_) => (),
Err(e) => {
diff --git a/src/parse.rs b/src/parse.rs
index dc0ec0a..526b6ce 100644
--- a/src/parse.rs
+++ b/src/parse.rs
@@ -62,17 +62,33 @@ impl Exe {
#[derive(Debug, Clone)]
pub struct Pipeline {
exes: Vec<Exe>,
+ input_string: String,
}
impl Pipeline {
+ pub fn parse(pipeline: &str) -> Result<Self, Error> {
+ Ok(Self::build_ast(
+ Shell::parse(Rule::pipeline, pipeline)
+ .map_err(|e| Error::new(pipeline, anyhow::anyhow!(e)))?
+ .next()
+ .unwrap(),
+ ))
+ }
+
pub fn exes(&self) -> &[Exe] {
&self.exes
}
+ pub fn input_string(&self) -> &str {
+ &self.input_string
+ }
+
fn build_ast(pipeline: pest::iterators::Pair<Rule>) -> Self {
assert!(matches!(pipeline.as_rule(), Rule::pipeline));
+ let input_string = pipeline.as_str().to_string();
Self {
exes: pipeline.into_inner().map(Exe::build_ast).collect(),
+ input_string,
}
}
}
@@ -117,6 +133,7 @@ impl Commands {
}
}
+#[derive(Debug)]
pub struct Error {
input: String,
e: anyhow::Error,
diff --git a/src/pipe.rs b/src/pipe.rs
index 7d35961..3bba117 100644
--- a/src/pipe.rs
+++ b/src/pipe.rs
@@ -1,3 +1,84 @@
-pub fn run() {
- todo!()
+use async_std::io::prelude::ReadExt as _;
+use async_std::os::unix::process::CommandExt as _;
+use async_std::stream::StreamExt as _;
+use std::os::unix::io::{AsRawFd as _, FromRawFd as _};
+use std::os::unix::process::ExitStatusExt as _;
+
+async fn read_pipeline() -> crate::parse::Pipeline {
+ let mut r = unsafe { async_std::fs::File::from_raw_fd(3) };
+ let mut s = String::new();
+ r.read_to_string(&mut s).await.unwrap();
+ crate::parse::Pipeline::parse(&s).unwrap()
+}
+
+pub async fn run() {
+ let pipeline = read_pipeline().await;
+
+ let mut futures = futures_util::stream::FuturesUnordered::new();
+ let mut pg = None;
+ let mut stdin = None;
+ let last = pipeline.exes().len() - 1;
+ for (i, exe) in pipeline.exes().iter().enumerate() {
+ let mut cmd = async_std::process::Command::new(exe.exe());
+ cmd.args(exe.args());
+ if let Some(stdin) = stdin {
+ cmd.stdin(unsafe {
+ async_std::process::Stdio::from_raw_fd(stdin)
+ });
+ }
+ if i < last {
+ let (r, w) =
+ nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
+ stdin = Some(r);
+ cmd.stdout(unsafe { async_std::process::Stdio::from_raw_fd(w) });
+ }
+ let pg_pid = nix::unistd::Pid::from_raw(pg.unwrap_or(0));
+ unsafe {
+ cmd.pre_exec(move || {
+ nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), pg_pid)?;
+ Ok(())
+ });
+ }
+ let child = cmd.spawn().unwrap();
+ let res = nix::unistd::setpgid(
+ nix::unistd::Pid::from_raw(child.id().try_into().unwrap()),
+ pg_pid,
+ );
+ match res {
+ Ok(()) => {}
+ Err(e) => {
+ if e != nix::errno::Errno::EACCES {
+ res.unwrap();
+ }
+ }
+ }
+ if pg.is_none() {
+ pg = Some(child.id().try_into().unwrap());
+ }
+ futures.push(async move {
+ (child.status_no_drop().await.unwrap(), i == last)
+ });
+ }
+
+ let pty = std::fs::File::open("/dev/tty").unwrap();
+ nix::unistd::tcsetpgrp(
+ pty.as_raw_fd(),
+ nix::unistd::Pid::from_raw(pg.unwrap()),
+ )
+ .unwrap();
+
+ let mut final_status = None;
+ while let Some((status, last)) = futures.next().await {
+ if status.signal() == Some(signal_hook::consts::signal::SIGINT) {
+ nix::sys::signal::raise(nix::sys::signal::SIGINT).unwrap();
+ }
+ if last {
+ final_status = Some(status);
+ }
+ }
+ if let Some(code) = final_status.unwrap().code() {
+ std::process::exit(code);
+ } else {
+ std::process::exit(255);
+ }
}
diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs
index ede9120..5a38bc0 100644
--- a/src/state/history/mod.rs
+++ b/src/state/history/mod.rs
@@ -647,9 +647,13 @@ async fn run_pipeline(
let child = cmd.spawn(&env.pty).unwrap();
nix::unistd::close(r).unwrap();
- let w = unsafe { std::fs::File::from_raw_fd(w) };
+ let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
+ let pipeline = pipeline.clone();
let fut = async move {
- // TODO write data to w
+ w.write_all(pipeline.input_string().as_bytes())
+ .await
+ .unwrap();
+ drop(w);
child.status_no_drop().await.unwrap()
};
run_future(fut, env.clone()).await