summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2021-12-31 03:36:33 -0500
committerJesse Luehrs <doy@tozt.net>2021-12-31 03:36:33 -0500
commit1c2486a55c21b323f73c72c0128def0fcac061eb (patch)
treed290d26aa6dc6462aee08f8620159980b43f80c3
parentd82cd70272ea300fcfd98110f674580b835ffab2 (diff)
downloadnbsh-1c2486a55c21b323f73c72c0128def0fcac061eb.tar.gz
nbsh-1c2486a55c21b323f73c72c0128def0fcac061eb.zip
basic implementation of pipes
-rw-r--r--Cargo.lock36
-rw-r--r--Cargo.toml1
-rw-r--r--src/main.rs9
-rw-r--r--src/parse.rs17
-rw-r--r--src/pipe.rs85
-rw-r--r--src/state/history/mod.rs8
6 files changed, 146 insertions, 10 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 88a9329..24c3b03 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -301,9 +301,9 @@ dependencies = [
[[package]]
name = "futures-core"
-version = "0.3.17"
+version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
+checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
[[package]]
name = "futures-io"
@@ -327,6 +327,37 @@ dependencies = [
]
[[package]]
+name = "futures-macro"
+version = "0.3.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-task"
+version = "0.3.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72"
+
+[[package]]
+name = "futures-util"
+version = "0.3.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"
+dependencies = [
+ "futures-core",
+ "futures-macro",
+ "futures-task",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+]
+
+[[package]]
name = "generic-array"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -463,6 +494,7 @@ dependencies = [
"anyhow",
"async-std",
"futures-lite",
+ "futures-util",
"hostname",
"libc",
"nix",
diff --git a/Cargo.toml b/Cargo.toml
index 47e83d2..65b5a6f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,6 +9,7 @@ license = "MIT"
anyhow = "1.0.51"
async-std = { version = "1.10.0", features = ["unstable"] }
futures-lite = "1.12.0"
+futures-util = "0.3.19"
hostname = "0.3.1"
libc = "0.2.112"
nix = "0.23.0"
diff --git a/src/main.rs b/src/main.rs
index b5b1f09..a4a0773 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -43,6 +43,11 @@ fn get_offset() -> time::UtcOffset {
}
async fn async_main() -> anyhow::Result<()> {
+ if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") {
+ pipe::run().await;
+ std::process::exit(0);
+ }
+
let mut input = textmode::Input::new().await?;
let mut output = textmode::Output::new().await?;
@@ -136,10 +141,6 @@ async fn async_main() -> anyhow::Result<()> {
}
fn main() {
- if std::env::args().nth(1).as_deref() == Some("--internal-pipe-runner") {
- pipe::run();
- std::process::exit(0);
- }
match async_std::task::block_on(async_main()) {
Ok(_) => (),
Err(e) => {
diff --git a/src/parse.rs b/src/parse.rs
index dc0ec0a..526b6ce 100644
--- a/src/parse.rs
+++ b/src/parse.rs
@@ -62,17 +62,33 @@ impl Exe {
#[derive(Debug, Clone)]
pub struct Pipeline {
exes: Vec<Exe>,
+ input_string: String,
}
impl Pipeline {
+ pub fn parse(pipeline: &str) -> Result<Self, Error> {
+ Ok(Self::build_ast(
+ Shell::parse(Rule::pipeline, pipeline)
+ .map_err(|e| Error::new(pipeline, anyhow::anyhow!(e)))?
+ .next()
+ .unwrap(),
+ ))
+ }
+
pub fn exes(&self) -> &[Exe] {
&self.exes
}
+ pub fn input_string(&self) -> &str {
+ &self.input_string
+ }
+
fn build_ast(pipeline: pest::iterators::Pair<Rule>) -> Self {
assert!(matches!(pipeline.as_rule(), Rule::pipeline));
+ let input_string = pipeline.as_str().to_string();
Self {
exes: pipeline.into_inner().map(Exe::build_ast).collect(),
+ input_string,
}
}
}
@@ -117,6 +133,7 @@ impl Commands {
}
}
+#[derive(Debug)]
pub struct Error {
input: String,
e: anyhow::Error,
diff --git a/src/pipe.rs b/src/pipe.rs
index 7d35961..3bba117 100644
--- a/src/pipe.rs
+++ b/src/pipe.rs
@@ -1,3 +1,84 @@
-pub fn run() {
- todo!()
+use async_std::io::prelude::ReadExt as _;
+use async_std::os::unix::process::CommandExt as _;
+use async_std::stream::StreamExt as _;
+use std::os::unix::io::{AsRawFd as _, FromRawFd as _};
+use std::os::unix::process::ExitStatusExt as _;
+
+async fn read_pipeline() -> crate::parse::Pipeline {
+ let mut r = unsafe { async_std::fs::File::from_raw_fd(3) };
+ let mut s = String::new();
+ r.read_to_string(&mut s).await.unwrap();
+ crate::parse::Pipeline::parse(&s).unwrap()
+}
+
+pub async fn run() {
+ let pipeline = read_pipeline().await;
+
+ let mut futures = futures_util::stream::FuturesUnordered::new();
+ let mut pg = None;
+ let mut stdin = None;
+ let last = pipeline.exes().len() - 1;
+ for (i, exe) in pipeline.exes().iter().enumerate() {
+ let mut cmd = async_std::process::Command::new(exe.exe());
+ cmd.args(exe.args());
+ if let Some(stdin) = stdin {
+ cmd.stdin(unsafe {
+ async_std::process::Stdio::from_raw_fd(stdin)
+ });
+ }
+ if i < last {
+ let (r, w) =
+ nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
+ stdin = Some(r);
+ cmd.stdout(unsafe { async_std::process::Stdio::from_raw_fd(w) });
+ }
+ let pg_pid = nix::unistd::Pid::from_raw(pg.unwrap_or(0));
+ unsafe {
+ cmd.pre_exec(move || {
+ nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), pg_pid)?;
+ Ok(())
+ });
+ }
+ let child = cmd.spawn().unwrap();
+ let res = nix::unistd::setpgid(
+ nix::unistd::Pid::from_raw(child.id().try_into().unwrap()),
+ pg_pid,
+ );
+ match res {
+ Ok(()) => {}
+ Err(e) => {
+ if e != nix::errno::Errno::EACCES {
+ res.unwrap();
+ }
+ }
+ }
+ if pg.is_none() {
+ pg = Some(child.id().try_into().unwrap());
+ }
+ futures.push(async move {
+ (child.status_no_drop().await.unwrap(), i == last)
+ });
+ }
+
+ let pty = std::fs::File::open("/dev/tty").unwrap();
+ nix::unistd::tcsetpgrp(
+ pty.as_raw_fd(),
+ nix::unistd::Pid::from_raw(pg.unwrap()),
+ )
+ .unwrap();
+
+ let mut final_status = None;
+ while let Some((status, last)) = futures.next().await {
+ if status.signal() == Some(signal_hook::consts::signal::SIGINT) {
+ nix::sys::signal::raise(nix::sys::signal::SIGINT).unwrap();
+ }
+ if last {
+ final_status = Some(status);
+ }
+ }
+ if let Some(code) = final_status.unwrap().code() {
+ std::process::exit(code);
+ } else {
+ std::process::exit(255);
+ }
}
diff --git a/src/state/history/mod.rs b/src/state/history/mod.rs
index ede9120..5a38bc0 100644
--- a/src/state/history/mod.rs
+++ b/src/state/history/mod.rs
@@ -647,9 +647,13 @@ async fn run_pipeline(
let child = cmd.spawn(&env.pty).unwrap();
nix::unistd::close(r).unwrap();
- let w = unsafe { std::fs::File::from_raw_fd(w) };
+ let mut w = unsafe { async_std::fs::File::from_raw_fd(w) };
+ let pipeline = pipeline.clone();
let fut = async move {
- // TODO write data to w
+ w.write_all(pipeline.input_string().as_bytes())
+ .await
+ .unwrap();
+ drop(w);
child.status_no_drop().await.unwrap()
};
run_future(fut, env.clone()).await