summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-03-04 18:10:49 -0500
committerJesse Luehrs <doy@tozt.net>2022-03-04 18:10:49 -0500
commite1bfb9bc59a6a97594cb5c2c51cc4ca8ee813a23 (patch)
treea7b9f6982b76e0e56c3cbc2c2909f6e9e04a8582 /src
parentb1c5f2f31874fc019b67ae981f66e0492b22c867 (diff)
downloadnbsh-e1bfb9bc59a6a97594cb5c2c51cc4ca8ee813a23.tar.gz
nbsh-e1bfb9bc59a6a97594cb5c2c51cc4ca8ee813a23.zip
refactor inputs
Diffstat (limited to 'src')
-rw-r--r--src/main.rs2
-rw-r--r--src/runner/mod.rs2
-rw-r--r--src/shell/event.rs6
-rw-r--r--src/shell/history/job.rs152
-rw-r--r--src/shell/history/pty.rs104
-rw-r--r--src/shell/inputs/clock.rs27
-rw-r--r--src/shell/inputs/git.rs (renamed from src/shell/git.rs)76
-rw-r--r--src/shell/inputs/mod.rs32
-rw-r--r--src/shell/inputs/signals.rs30
-rw-r--r--src/shell/inputs/stdin.rs17
-rw-r--r--src/shell/mod.rs130
-rw-r--r--src/shell/readline.rs2
12 files changed, 323 insertions, 257 deletions
diff --git a/src/main.rs b/src/main.rs
index 1ace4d7..b3e2fd5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -47,7 +47,7 @@ async fn async_main(opt: Opt) -> Result<i32> {
})
});
- return runner::run(command, &mut shell_write).await;
+ return runner::main(command, &mut shell_write).await;
}
shell::main().await
diff --git a/src/runner/mod.rs b/src/runner/mod.rs
index 628b333..ea55b34 100644
--- a/src/runner/mod.rs
+++ b/src/runner/mod.rs
@@ -68,7 +68,7 @@ enum Frame {
For(bool, usize, Vec<String>),
}
-pub async fn run(
+pub async fn main(
commands: String,
shell_write: &mut Option<tokio::fs::File>,
) -> Result<i32> {
diff --git a/src/shell/event.rs b/src/shell/event.rs
index 2b12b05..fe96d5b 100644
--- a/src/shell/event.rs
+++ b/src/shell/event.rs
@@ -8,7 +8,7 @@ pub enum Event {
ChildRunPipeline(usize, (usize, usize)),
ChildSuspend(usize),
ChildExit(usize, Option<Env>),
- GitInfo(Option<super::git::Info>),
+ GitInfo(Option<super::inputs::GitInfo>),
ClockTimer,
}
@@ -43,7 +43,7 @@ impl Reader {
let inner = std::sync::Arc::new(InnerReader::new());
{
let inner = inner.clone();
- tokio::task::spawn(async move {
+ tokio::spawn(async move {
while let Some(event) = input.recv().await {
inner.new_event(Some(event));
}
@@ -95,7 +95,7 @@ struct Pending {
child_run_pipeline: std::collections::VecDeque<(usize, (usize, usize))>,
child_suspend: std::collections::VecDeque<usize>,
child_exit: Option<(usize, Option<Env>)>,
- git_info: Option<Option<super::git::Info>>,
+ git_info: Option<Option<super::inputs::GitInfo>>,
clock_timer: bool,
done: bool,
}
diff --git a/src/shell/history/job.rs b/src/shell/history/job.rs
index 365a06d..d3d112a 100644
--- a/src/shell/history/job.rs
+++ b/src/shell/history/job.rs
@@ -19,7 +19,7 @@ impl Job {
let state = std::sync::Arc::new(std::sync::Mutex::new(
State::Running((0, 0)),
));
- tokio::task::spawn(job_task(
+ tokio::spawn(Self::task(
child,
fh,
std::sync::Arc::clone(&state),
@@ -66,6 +66,83 @@ impl Job {
}
});
}
+
+ async fn task(
+ mut child: tokio::process::Child,
+ fh: std::fs::File,
+ state: std::sync::Arc<std::sync::Mutex<State>>,
+ env: Env,
+ event_w: crate::shell::event::Writer,
+ ) {
+ enum Res {
+ Read(crate::runner::Event),
+ Exit(std::io::Result<std::process::ExitStatus>),
+ }
+
+ let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel();
+ tokio::task::spawn_blocking(move || loop {
+ let event = bincode::deserialize_from(&fh);
+ match event {
+ Ok(event) => {
+ read_w.send(event).unwrap();
+ }
+ Err(e) => {
+ match &*e {
+ bincode::ErrorKind::Io(io_e) => {
+ assert!(
+ io_e.kind()
+ == std::io::ErrorKind::UnexpectedEof
+ );
+ }
+ e => {
+ panic!("{}", e);
+ }
+ }
+ break;
+ }
+ }
+ });
+
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_stream::wrappers::UnboundedReceiverStream::new(read_r)
+ .map(Res::Read)
+ .boxed(),
+ futures_util::stream::once(child.wait())
+ .map(Res::Exit)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+ let mut exit_status = None;
+ let mut new_env = None;
+ while let Some(res) = stream.next().await {
+ match res {
+ Res::Read(event) => match event {
+ crate::runner::Event::RunPipeline(new_span) => {
+ // we could just update the span in place here, but we
+ // do this as an event so that we can also trigger a
+ // refresh
+ event_w.send(Event::ChildRunPipeline(
+ env.idx(),
+ new_span,
+ ));
+ }
+ crate::runner::Event::Suspend => {
+ event_w.send(Event::ChildSuspend(env.idx()));
+ }
+ crate::runner::Event::Exit(env) => {
+ new_env = Some(env);
+ }
+ },
+ Res::Exit(status) => {
+ exit_status = Some(status.unwrap());
+ }
+ }
+ }
+ *state.lock().unwrap() =
+ State::Exited(ExitInfo::new(exit_status.unwrap()));
+ event_w.send(Event::ChildExit(env.idx(), new_env));
+ }
}
pub enum State {
@@ -108,79 +185,6 @@ impl ExitInfo {
}
}
-async fn job_task(
- mut child: tokio::process::Child,
- fh: std::fs::File,
- state: std::sync::Arc<std::sync::Mutex<State>>,
- env: Env,
- event_w: crate::shell::event::Writer,
-) {
- enum Res {
- Read(crate::runner::Event),
- Exit(std::io::Result<std::process::ExitStatus>),
- }
-
- let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel();
- tokio::task::spawn_blocking(move || loop {
- let event = bincode::deserialize_from(&fh);
- match event {
- Ok(event) => {
- read_w.send(event).unwrap();
- }
- Err(e) => {
- match &*e {
- bincode::ErrorKind::Io(io_e) => {
- assert!(
- io_e.kind() == std::io::ErrorKind::UnexpectedEof
- );
- }
- e => {
- panic!("{}", e);
- }
- }
- break;
- }
- }
- });
-
- let mut stream: futures_util::stream::SelectAll<_> = [
- tokio_stream::wrappers::UnboundedReceiverStream::new(read_r)
- .map(Res::Read)
- .boxed(),
- futures_util::stream::once(child.wait())
- .map(Res::Exit)
- .boxed(),
- ]
- .into_iter()
- .collect();
- let mut exit_status = None;
- let mut new_env = None;
- while let Some(res) = stream.next().await {
- match res {
- Res::Read(event) => match event {
- crate::runner::Event::RunPipeline(new_span) => {
- // we could just update the span in place here, but we do
- // this as an event so that we can also trigger a refresh
- event_w
- .send(Event::ChildRunPipeline(env.idx(), new_span));
- }
- crate::runner::Event::Suspend => {
- event_w.send(Event::ChildSuspend(env.idx()));
- }
- crate::runner::Event::Exit(env) => {
- new_env = Some(env);
- }
- },
- Res::Exit(status) => {
- exit_status = Some(status.unwrap());
- }
- }
- }
- *state.lock().unwrap() =
- State::Exited(ExitInfo::new(exit_status.unwrap()));
- event_w.send(Event::ChildExit(env.idx(), new_env));
-}
-
fn spawn_command(
cmdline: &str,
env: &Env,
diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs
index 49681d4..91b9cbb 100644
--- a/src/shell/history/pty.rs
+++ b/src/shell/history/pty.rs
@@ -26,7 +26,7 @@ impl Pty {
super::pty::Vt::new(size),
));
- tokio::task::spawn(pty_task(
+ tokio::spawn(Self::task(
pty,
std::sync::Arc::clone(&vt),
request_r,
@@ -66,6 +66,57 @@ impl Pty {
#[allow(clippy::let_underscore_drop)]
let _ = self.request_w.send(Request::Resize(size.0, size.1));
}
+
+ async fn task(
+ pty: pty_process::Pty,
+ vt: std::sync::Arc<std::sync::Mutex<super::pty::Vt>>,
+ request_r: tokio::sync::mpsc::UnboundedReceiver<Request>,
+ event_w: crate::shell::event::Writer,
+ ) {
+ enum Res {
+ Read(Result<bytes::Bytes, std::io::Error>),
+ Request(Request),
+ }
+
+ let (pty_r, mut pty_w) = pty.into_split();
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_util::io::ReaderStream::new(pty_r)
+ .map(Res::Read)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(request_r)
+ .map(Res::Request)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+ while let Some(res) = stream.next().await {
+ match res {
+ Res::Read(res) => match res {
+ Ok(bytes) => {
+ vt.lock().unwrap().process(&bytes);
+ event_w.send(Event::PtyOutput);
+ }
+ Err(e) => {
+ // this means that there are no longer any open pts
+ // fds. we could alternately signal this through an
+ // explicit channel at ChildExit time, but this seems
+ // reliable enough.
+ if e.raw_os_error() == Some(libc::EIO) {
+ return;
+ }
+ panic!("pty read failed: {:?}", e);
+ }
+ },
+ Res::Request(Request::Input(bytes)) => {
+ pty_w.write(&bytes).await.unwrap();
+ }
+ Res::Request(Request::Resize(row, col)) => {
+ pty_w.resize(pty_process::Size::new(row, col)).unwrap();
+ vt.lock().unwrap().set_size((row, col));
+ }
+ }
+ }
+ }
}
pub struct Vt {
@@ -161,54 +212,3 @@ impl Vt {
last_row
}
}
-
-async fn pty_task(
- pty: pty_process::Pty,
- vt: std::sync::Arc<std::sync::Mutex<super::pty::Vt>>,
- request_r: tokio::sync::mpsc::UnboundedReceiver<Request>,
- event_w: crate::shell::event::Writer,
-) {
- enum Res {
- Read(Result<bytes::Bytes, std::io::Error>),
- Request(Request),
- }
-
- let (pty_r, mut pty_w) = pty.into_split();
- let mut stream: futures_util::stream::SelectAll<_> = [
- tokio_util::io::ReaderStream::new(pty_r)
- .map(Res::Read)
- .boxed(),
- tokio_stream::wrappers::UnboundedReceiverStream::new(request_r)
- .map(Res::Request)
- .boxed(),
- ]
- .into_iter()
- .collect();
- while let Some(res) = stream.next().await {
- match res {
- Res::Read(res) => match res {
- Ok(bytes) => {
- vt.lock().unwrap().process(&bytes);
- event_w.send(Event::PtyOutput);
- }
- Err(e) => {
- // this means that there are no longer any open pts fds.
- // we could alternately signal this through an explicit
- // channel at ChildExit time, but this seems reliable
- // enough.
- if e.raw_os_error() == Some(libc::EIO) {
- return;
- }
- panic!("pty read failed: {:?}", e);
- }
- },
- Res::Request(Request::Input(bytes)) => {
- pty_w.write(&bytes).await.unwrap();
- }
- Res::Request(Request::Resize(row, col)) => {
- pty_w.resize(pty_process::Size::new(row, col)).unwrap();
- vt.lock().unwrap().set_size((row, col));
- }
- }
- }
-}
diff --git a/src/shell/inputs/clock.rs b/src/shell/inputs/clock.rs
new file mode 100644
index 0000000..250466e
--- /dev/null
+++ b/src/shell/inputs/clock.rs
@@ -0,0 +1,27 @@
+use crate::shell::prelude::*;
+
+pub struct Handler;
+
+impl Handler {
+ pub fn new(event_w: crate::shell::event::Writer) -> Self {
+ tokio::spawn(Self::task(event_w));
+ Self
+ }
+
+ async fn task(event_w: crate::shell::event::Writer) {
+ let now_clock = time::OffsetDateTime::now_utc();
+ let now_instant = tokio::time::Instant::now();
+ let mut interval = tokio::time::interval_at(
+ now_instant
+ + std::time::Duration::from_nanos(
+ 1_000_000_000_u64
+ .saturating_sub(now_clock.nanosecond().into()),
+ ),
+ std::time::Duration::from_secs(1),
+ );
+ loop {
+ interval.tick().await;
+ event_w.send(Event::ClockTimer);
+ }
+ }
+}
diff --git a/src/shell/git.rs b/src/shell/inputs/git.rs
index 48e5eea..1c1c92d 100644
--- a/src/shell/git.rs
+++ b/src/shell/inputs/git.rs
@@ -1,3 +1,79 @@
+use crate::shell::prelude::*;
+
+use notify::Watcher as _;
+
+pub struct Handler {
+ git_w: tokio::sync::mpsc::UnboundedSender<std::path::PathBuf>,
+}
+
+impl Handler {
+ pub fn new(event_w: crate::shell::event::Writer) -> Self {
+ let (git_w, git_r) = tokio::sync::mpsc::unbounded_channel();
+ tokio::spawn(Self::task(git_r, event_w));
+ Self { git_w }
+ }
+
+ pub fn new_dir(&self, path: std::path::PathBuf) {
+ self.git_w.send(path).unwrap();
+ }
+
+ async fn task(
+ mut git_r: tokio::sync::mpsc::UnboundedReceiver<std::path::PathBuf>,
+ event_w: crate::shell::event::Writer,
+ ) {
+ // clippy can't tell that we assign to this later
+ #[allow(clippy::no_effect_underscore_binding)]
+ let mut _active_watcher = None;
+ while let Some(mut dir) = git_r.recv().await {
+ while let Ok(newer_dir) = git_r.try_recv() {
+ dir = newer_dir;
+ }
+ let repo = git2::Repository::discover(&dir).ok();
+ if repo.is_some() {
+ let (sync_watch_w, sync_watch_r) = std::sync::mpsc::channel();
+ let (watch_w, mut watch_r) =
+ tokio::sync::mpsc::unbounded_channel();
+ let mut watcher = notify::RecommendedWatcher::new(
+ sync_watch_w,
+ std::time::Duration::from_millis(100),
+ )
+ .unwrap();
+ watcher
+ .watch(&dir, notify::RecursiveMode::Recursive)
+ .unwrap();
+ tokio::task::spawn_blocking(move || {
+ while let Ok(event) = sync_watch_r.recv() {
+ if watch_w.send(event).is_err() {
+ break;
+ }
+ }
+ });
+ let event_w = event_w.clone();
+ tokio::spawn(async move {
+ while watch_r.recv().await.is_some() {
+ let repo = git2::Repository::discover(&dir).ok();
+ let info = tokio::task::spawn_blocking(|| {
+ repo.map(|repo| Info::new(&repo))
+ })
+ .await
+ .unwrap();
+ event_w.send(Event::GitInfo(info));
+ }
+ });
+ _active_watcher = Some(watcher);
+ } else {
+ _active_watcher = None;
+ }
+ let info = tokio::task::spawn_blocking(|| {
+ repo.map(|repo| Info::new(&repo))
+ })
+ .await
+ .unwrap();
+ event_w.send(Event::GitInfo(info));
+ }
+ }
+}
+
#[derive(Debug)]
pub struct Info {
modified_files: bool,
diff --git a/src/shell/inputs/mod.rs b/src/shell/inputs/mod.rs
new file mode 100644
index 0000000..48590a2
--- /dev/null
+++ b/src/shell/inputs/mod.rs
@@ -0,0 +1,32 @@
+use crate::shell::prelude::*;
+
+mod clock;
+mod git;
+pub use git::Info as GitInfo;
+mod signals;
+mod stdin;
+
+pub struct Handler {
+ _clock: clock::Handler,
+ git: git::Handler,
+ _signals: signals::Handler,
+ _stdin: stdin::Handler,
+}
+
+impl Handler {
+ pub fn new(
+ input: textmode::blocking::Input,
+ event_w: crate::shell::event::Writer,
+ ) -> Result<Self> {
+ Ok(Self {
+ _clock: clock::Handler::new(event_w.clone()),
+ git: git::Handler::new(event_w.clone()),
+ _signals: signals::Handler::new(event_w.clone())?,
+ _stdin: stdin::Handler::new(input, event_w),
+ })
+ }
+
+ pub fn new_dir(&self, path: std::path::PathBuf) {
+ self.git.new_dir(path);
+ }
+}
diff --git a/src/shell/inputs/signals.rs b/src/shell/inputs/signals.rs
new file mode 100644
index 0000000..4b91273
--- /dev/null
+++ b/src/shell/inputs/signals.rs
@@ -0,0 +1,30 @@
+use crate::shell::prelude::*;
+
+pub struct Handler;
+
+impl Handler {
+ pub fn new(event_w: crate::shell::event::Writer) -> Result<Self> {
+ let signals = tokio::signal::unix::signal(
+ tokio::signal::unix::SignalKind::window_change(),
+ )?;
+ tokio::spawn(Self::task(signals, event_w));
+ Ok(Self)
+ }
+
+ async fn task(
+ mut signals: tokio::signal::unix::Signal,
+ event_w: crate::shell::event::Writer,
+ ) {
+ event_w.send(resize_event());
+ while signals.recv().await.is_some() {
+ event_w.send(resize_event());
+ }
+ }
+}
+
+fn resize_event() -> Event {
+ Event::Resize(terminal_size::terminal_size().map_or(
+ (24, 80),
+ |(terminal_size::Width(w), terminal_size::Height(h))| (h, w),
+ ))
+}
diff --git a/src/shell/inputs/stdin.rs b/src/shell/inputs/stdin.rs
new file mode 100644
index 0000000..b966307
--- /dev/null
+++ b/src/shell/inputs/stdin.rs
@@ -0,0 +1,17 @@
+use crate::shell::prelude::*;
+
+pub struct Handler;
+
+impl Handler {
+ pub fn new(
+ mut input: textmode::blocking::Input,
+ event_w: crate::shell::event::Writer,
+ ) -> Self {
+ std::thread::spawn(move || {
+ while let Some(key) = input.read_key().unwrap() {
+ event_w.send(Event::Key(key));
+ }
+ });
+ Self
+ }
+}
diff --git a/src/shell/mod.rs b/src/shell/mod.rs
index b23ab1e..0f42cde 100644
--- a/src/shell/mod.rs
+++ b/src/shell/mod.rs
@@ -1,11 +1,10 @@
use crate::shell::prelude::*;
-use notify::Watcher as _;
use textmode::Textmode as _;
mod event;
-mod git;
mod history;
+mod inputs;
mod prelude;
mod readline;
@@ -20,135 +19,16 @@ pub async fn main() -> Result<i32> {
let (event_w, event_r) = event::channel();
- {
- let mut signals = tokio::signal::unix::signal(
- tokio::signal::unix::SignalKind::window_change(),
- )?;
- let event_w = event_w.clone();
- tokio::task::spawn(async move {
- event_w.send(Event::Resize(
- terminal_size::terminal_size().map_or(
- (24, 80),
- |(terminal_size::Width(w), terminal_size::Height(h))| {
- (h, w)
- },
- ),
- ));
- while signals.recv().await.is_some() {
- event_w.send(Event::Resize(
- terminal_size::terminal_size().map_or(
- (24, 80),
- |(
- terminal_size::Width(w),
- terminal_size::Height(h),
- )| { (h, w) },
- ),
- ));
- }
- });
- }
-
- {
- let event_w = event_w.clone();
- std::thread::spawn(move || {
- while let Some(key) = input.read_key().unwrap() {
- event_w.send(Event::Key(key));
- }
- });
- }
-
- // redraw the clock every second
- {
- let event_w = event_w.clone();
- tokio::task::spawn(async move {
- let now_clock = time::OffsetDateTime::now_utc();
- let now_instant = tokio::time::Instant::now();
- let mut interval = tokio::time::interval_at(
- now_instant
- + std::time::Duration::from_nanos(
- 1_000_000_000_u64
- .saturating_sub(now_clock.nanosecond().into()),
- ),
- std::time::Duration::from_secs(1),
- );
- loop {
- interval.tick().await;
- event_w.send(Event::ClockTimer);
- }
- });
- }
-
- let (git_w, mut git_r): (
- tokio::sync::mpsc::UnboundedSender<std::path::PathBuf>,
- _,
- ) = tokio::sync::mpsc::unbounded_channel();
- {
- let event_w = event_w.clone();
- // clippy can't tell that we assign to this later
- #[allow(clippy::no_effect_underscore_binding)]
- let mut _active_watcher = None;
- tokio::task::spawn(async move {
- while let Some(mut dir) = git_r.recv().await {
- while let Ok(newer_dir) = git_r.try_recv() {
- dir = newer_dir;
- }
- let repo = git2::Repository::discover(&dir).ok();
- if repo.is_some() {
- let (sync_watch_w, sync_watch_r) =
- std::sync::mpsc::channel();
- let (watch_w, mut watch_r) =
- tokio::sync::mpsc::unbounded_channel();
- let mut watcher = notify::RecommendedWatcher::new(
- sync_watch_w,
- std::time::Duration::from_millis(100),
- )
- .unwrap();
- watcher
- .watch(&dir, notify::RecursiveMode::Recursive)
- .unwrap();
- tokio::task::spawn_blocking(move || {
- while let Ok(event) = sync_watch_r.recv() {
- let watch_w = watch_w.clone();
- let send_failed = watch_w.send(event).is_err();
- if send_failed {
- break;
- }
- }
- });
- let event_w = event_w.clone();
- tokio::task::spawn(async move {
- while watch_r.recv().await.is_some() {
- let repo = git2::Repository::discover(&dir).ok();
- let info = tokio::task::spawn_blocking(|| {
- repo.map(|repo| git::Info::new(&repo))
- })
- .await
- .unwrap();
- event_w.send(Event::GitInfo(info));
- }
- });
- _active_watcher = Some(watcher);
- } else {
- _active_watcher = None;
- }
- let info = tokio::task::spawn_blocking(|| {
- repo.map(|repo| git::Info::new(&repo))
- })
- .await
- .unwrap();
- event_w.send(Event::GitInfo(info));
- }
- });
- }
+ let inputs = inputs::Handler::new(input, event_w.clone()).unwrap();
let mut shell = Shell::new(crate::info::get_offset())?;
let mut prev_dir = shell.env.pwd().to_path_buf();
- git_w.send(prev_dir.clone()).unwrap();
+ inputs.new_dir(prev_dir.clone());
while let Some(event) = event_r.recv().await {
let dir = shell.env().pwd();
if dir != prev_dir {
prev_dir = dir.to_path_buf();
- git_w.send(dir.to_path_buf()).unwrap();
+ inputs.new_dir(dir.to_path_buf());
}
match shell.handle_event(event, &event_w) {
Some(Action::Refresh) => {
@@ -196,7 +76,7 @@ pub struct Shell {
readline: readline::Readline,
history: history::History,
env: Env,
- git: Option<git::Info>,
+ git: Option<inputs::GitInfo>,
focus: Focus,
scene: Scene,
escape: bool,
diff --git a/src/shell/readline.rs b/src/shell/readline.rs
index 5de9901..654d264 100644
--- a/src/shell/readline.rs
+++ b/src/shell/readline.rs
@@ -23,7 +23,7 @@ impl Readline {
&self,
out: &mut impl textmode::Textmode,
env: &Env,
- git: Option<&super::git::Info>,
+ git: Option<&super::inputs::GitInfo>,
focus: bool,
offset: time::UtcOffset,
) -> Result<()> {