diff options
author | Jesse Luehrs <doy@tozt.net> | 2022-03-04 18:10:49 -0500 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2022-03-04 18:10:49 -0500 |
commit | e1bfb9bc59a6a97594cb5c2c51cc4ca8ee813a23 (patch) | |
tree | a7b9f6982b76e0e56c3cbc2c2909f6e9e04a8582 | |
parent | b1c5f2f31874fc019b67ae981f66e0492b22c867 (diff) | |
download | nbsh-e1bfb9bc59a6a97594cb5c2c51cc4ca8ee813a23.tar.gz nbsh-e1bfb9bc59a6a97594cb5c2c51cc4ca8ee813a23.zip |
refactor inputs
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/runner/mod.rs | 2 | ||||
-rw-r--r-- | src/shell/event.rs | 6 | ||||
-rw-r--r-- | src/shell/history/job.rs | 152 | ||||
-rw-r--r-- | src/shell/history/pty.rs | 104 | ||||
-rw-r--r-- | src/shell/inputs/clock.rs | 27 | ||||
-rw-r--r-- | src/shell/inputs/git.rs (renamed from src/shell/git.rs) | 76 | ||||
-rw-r--r-- | src/shell/inputs/mod.rs | 32 | ||||
-rw-r--r-- | src/shell/inputs/signals.rs | 30 | ||||
-rw-r--r-- | src/shell/inputs/stdin.rs | 17 | ||||
-rw-r--r-- | src/shell/mod.rs | 130 | ||||
-rw-r--r-- | src/shell/readline.rs | 2 |
12 files changed, 323 insertions, 257 deletions
diff --git a/src/main.rs b/src/main.rs index 1ace4d7..b3e2fd5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,7 @@ async fn async_main(opt: Opt) -> Result<i32> { }) }); - return runner::run(command, &mut shell_write).await; + return runner::main(command, &mut shell_write).await; } shell::main().await diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 628b333..ea55b34 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -68,7 +68,7 @@ enum Frame { For(bool, usize, Vec<String>), } -pub async fn run( +pub async fn main( commands: String, shell_write: &mut Option<tokio::fs::File>, ) -> Result<i32> { diff --git a/src/shell/event.rs b/src/shell/event.rs index 2b12b05..fe96d5b 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -8,7 +8,7 @@ pub enum Event { ChildRunPipeline(usize, (usize, usize)), ChildSuspend(usize), ChildExit(usize, Option<Env>), - GitInfo(Option<super::git::Info>), + GitInfo(Option<super::inputs::GitInfo>), ClockTimer, } @@ -43,7 +43,7 @@ impl Reader { let inner = std::sync::Arc::new(InnerReader::new()); { let inner = inner.clone(); - tokio::task::spawn(async move { + tokio::spawn(async move { while let Some(event) = input.recv().await { inner.new_event(Some(event)); } @@ -95,7 +95,7 @@ struct Pending { child_run_pipeline: std::collections::VecDeque<(usize, (usize, usize))>, child_suspend: std::collections::VecDeque<usize>, child_exit: Option<(usize, Option<Env>)>, - git_info: Option<Option<super::git::Info>>, + git_info: Option<Option<super::inputs::GitInfo>>, clock_timer: bool, done: bool, } diff --git a/src/shell/history/job.rs b/src/shell/history/job.rs index 365a06d..d3d112a 100644 --- a/src/shell/history/job.rs +++ b/src/shell/history/job.rs @@ -19,7 +19,7 @@ impl Job { let state = std::sync::Arc::new(std::sync::Mutex::new( State::Running((0, 0)), )); - tokio::task::spawn(job_task( + tokio::spawn(Self::task( child, fh, std::sync::Arc::clone(&state), @@ -66,6 +66,83 @@ impl Job { } }); } + + async fn task( + mut child: tokio::process::Child, + fh: std::fs::File, + state: std::sync::Arc<std::sync::Mutex<State>>, + env: Env, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result<std::process::ExitStatus>), + } + + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() + == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; + } + } + }); + + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + let mut new_env = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { + crate::runner::Event::RunPipeline(new_span) => { + // we could just update the span in place here, but we + // do this as an event so that we can also trigger a + // refresh + event_w.send(Event::ChildRunPipeline( + env.idx(), + new_span, + )); + } + crate::runner::Event::Suspend => { + event_w.send(Event::ChildSuspend(env.idx())); + } + crate::runner::Event::Exit(env) => { + new_env = Some(env); + } + }, + Res::Exit(status) => { + exit_status = Some(status.unwrap()); + } + } + } + *state.lock().unwrap() = + State::Exited(ExitInfo::new(exit_status.unwrap())); + event_w.send(Event::ChildExit(env.idx(), new_env)); + } } pub enum State { @@ -108,79 +185,6 @@ impl ExitInfo { } } -async fn job_task( - mut child: tokio::process::Child, - fh: std::fs::File, - state: std::sync::Arc<std::sync::Mutex<State>>, - env: Env, - event_w: crate::shell::event::Writer, -) { - enum Res { - Read(crate::runner::Event), - Exit(std::io::Result<std::process::ExitStatus>), - } - - let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); - tokio::task::spawn_blocking(move || loop { - let event = bincode::deserialize_from(&fh); - match event { - Ok(event) => { - read_w.send(event).unwrap(); - } - Err(e) => { - match &*e { - bincode::ErrorKind::Io(io_e) => { - assert!( - io_e.kind() == std::io::ErrorKind::UnexpectedEof - ); - } - e => { - panic!("{}", e); - } - } - break; - } - } - }); - - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) - .map(Res::Read) - .boxed(), - futures_util::stream::once(child.wait()) - .map(Res::Exit) - .boxed(), - ] - .into_iter() - .collect(); - let mut exit_status = None; - let mut new_env = None; - while let Some(res) = stream.next().await { - match res { - Res::Read(event) => match event { - crate::runner::Event::RunPipeline(new_span) => { - // we could just update the span in place here, but we do - // this as an event so that we can also trigger a refresh - event_w - .send(Event::ChildRunPipeline(env.idx(), new_span)); - } - crate::runner::Event::Suspend => { - event_w.send(Event::ChildSuspend(env.idx())); - } - crate::runner::Event::Exit(env) => { - new_env = Some(env); - } - }, - Res::Exit(status) => { - exit_status = Some(status.unwrap()); - } - } - } - *state.lock().unwrap() = - State::Exited(ExitInfo::new(exit_status.unwrap())); - event_w.send(Event::ChildExit(env.idx(), new_env)); -} - fn spawn_command( cmdline: &str, env: &Env, diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 49681d4..91b9cbb 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -26,7 +26,7 @@ impl Pty { super::pty::Vt::new(size), )); - tokio::task::spawn(pty_task( + tokio::spawn(Self::task( pty, std::sync::Arc::clone(&vt), request_r, @@ -66,6 +66,57 @@ impl Pty { #[allow(clippy::let_underscore_drop)] let _ = self.request_w.send(Request::Resize(size.0, size.1)); } + + async fn task( + pty: pty_process::Pty, + vt: std::sync::Arc<std::sync::Mutex<super::pty::Vt>>, + request_r: tokio::sync::mpsc::UnboundedReceiver<Request>, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(Result<bytes::Bytes, std::io::Error>), + Request(Request), + } + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) + .map(Res::Request) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { + Res::Read(res) => match res { + Ok(bytes) => { + vt.lock().unwrap().process(&bytes); + event_w.send(Event::PtyOutput); + } + Err(e) => { + // this means that there are no longer any open pts + // fds. we could alternately signal this through an + // explicit channel at ChildExit time, but this seems + // reliable enough. + if e.raw_os_error() == Some(libc::EIO) { + return; + } + panic!("pty read failed: {:?}", e); + } + }, + Res::Request(Request::Input(bytes)) => { + pty_w.write(&bytes).await.unwrap(); + } + Res::Request(Request::Resize(row, col)) => { + pty_w.resize(pty_process::Size::new(row, col)).unwrap(); + vt.lock().unwrap().set_size((row, col)); + } + } + } + } } pub struct Vt { @@ -161,54 +212,3 @@ impl Vt { last_row } } - -async fn pty_task( - pty: pty_process::Pty, - vt: std::sync::Arc<std::sync::Mutex<super::pty::Vt>>, - request_r: tokio::sync::mpsc::UnboundedReceiver<Request>, - event_w: crate::shell::event::Writer, -) { - enum Res { - Read(Result<bytes::Bytes, std::io::Error>), - Request(Request), - } - - let (pty_r, mut pty_w) = pty.into_split(); - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_util::io::ReaderStream::new(pty_r) - .map(Res::Read) - .boxed(), - tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) - .map(Res::Request) - .boxed(), - ] - .into_iter() - .collect(); - while let Some(res) = stream.next().await { - match res { - Res::Read(res) => match res { - Ok(bytes) => { - vt.lock().unwrap().process(&bytes); - event_w.send(Event::PtyOutput); - } - Err(e) => { - // this means that there are no longer any open pts fds. - // we could alternately signal this through an explicit - // channel at ChildExit time, but this seems reliable - // enough. - if e.raw_os_error() == Some(libc::EIO) { - return; - } - panic!("pty read failed: {:?}", e); - } - }, - Res::Request(Request::Input(bytes)) => { - pty_w.write(&bytes).await.unwrap(); - } - Res::Request(Request::Resize(row, col)) => { - pty_w.resize(pty_process::Size::new(row, col)).unwrap(); - vt.lock().unwrap().set_size((row, col)); - } - } - } -} diff --git a/src/shell/inputs/clock.rs b/src/shell/inputs/clock.rs new file mode 100644 index 0000000..250466e --- /dev/null +++ b/src/shell/inputs/clock.rs @@ -0,0 +1,27 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Self { + tokio::spawn(Self::task(event_w)); + Self + } + + async fn task(event_w: crate::shell::event::Writer) { + let now_clock = time::OffsetDateTime::now_utc(); + let now_instant = tokio::time::Instant::now(); + let mut interval = tokio::time::interval_at( + now_instant + + std::time::Duration::from_nanos( + 1_000_000_000_u64 + .saturating_sub(now_clock.nanosecond().into()), + ), + std::time::Duration::from_secs(1), + ); + loop { + interval.tick().await; + event_w.send(Event::ClockTimer); + } + } +} diff --git a/src/shell/git.rs b/src/shell/inputs/git.rs index 48e5eea..1c1c92d 100644 --- a/src/shell/git.rs +++ b/src/shell/inputs/git.rs @@ -1,3 +1,79 @@ +use crate::shell::prelude::*; + +use notify::Watcher as _; + +pub struct Handler { + git_w: tokio::sync::mpsc::UnboundedSender<std::path::PathBuf>, +} + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Self { + let (git_w, git_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn(Self::task(git_r, event_w)); + Self { git_w } + } + + pub fn new_dir(&self, path: std::path::PathBuf) { + self.git_w.send(path).unwrap(); + } + + async fn task( + mut git_r: tokio::sync::mpsc::UnboundedReceiver<std::path::PathBuf>, + event_w: crate::shell::event::Writer, + ) { + // clippy can't tell that we assign to this later + #[allow(clippy::no_effect_underscore_binding)] + let mut _active_watcher = None; + while let Some(mut dir) = git_r.recv().await { + while let Ok(newer_dir) = git_r.try_recv() { + dir = newer_dir; + } + let repo = git2::Repository::discover(&dir).ok(); + if repo.is_some() { + let (sync_watch_w, sync_watch_r) = std::sync::mpsc::channel(); + let (watch_w, mut watch_r) = + tokio::sync::mpsc::unbounded_channel(); + let mut watcher = notify::RecommendedWatcher::new( + sync_watch_w, + std::time::Duration::from_millis(100), + ) + .unwrap(); + watcher + .watch(&dir, notify::RecursiveMode::Recursive) + .unwrap(); + tokio::task::spawn_blocking(move || { + while let Ok(event) = sync_watch_r.recv() { + if watch_w.send(event).is_err() { + break; + } + } + }); + let event_w = event_w.clone(); + tokio::spawn(async move { + while watch_r.recv().await.is_some() { + let repo = git2::Repository::discover(&dir).ok(); + let info = tokio::task::spawn_blocking(|| { + repo.map(|repo| Info::new(&repo)) + }) + .await + .unwrap(); + event_w.send(Event::GitInfo(info)); + } + }); + _active_watcher = Some(watcher); + } else { + _active_watcher = None; + } + let info = tokio::task::spawn_blocking(|| { + repo.map(|repo| Info::new(&repo)) + }) + .await + .unwrap(); + event_w.send(Event::GitInfo(info)); + } + } +} + #[derive(Debug)] pub struct Info { modified_files: bool, diff --git a/src/shell/inputs/mod.rs b/src/shell/inputs/mod.rs new file mode 100644 index 0000000..48590a2 --- /dev/null +++ b/src/shell/inputs/mod.rs @@ -0,0 +1,32 @@ +use crate::shell::prelude::*; + +mod clock; +mod git; +pub use git::Info as GitInfo; +mod signals; +mod stdin; + +pub struct Handler { + _clock: clock::Handler, + git: git::Handler, + _signals: signals::Handler, + _stdin: stdin::Handler, +} + +impl Handler { + pub fn new( + input: textmode::blocking::Input, + event_w: crate::shell::event::Writer, + ) -> Result<Self> { + Ok(Self { + _clock: clock::Handler::new(event_w.clone()), + git: git::Handler::new(event_w.clone()), + _signals: signals::Handler::new(event_w.clone())?, + _stdin: stdin::Handler::new(input, event_w), + }) + } + + pub fn new_dir(&self, path: std::path::PathBuf) { + self.git.new_dir(path); + } +} diff --git a/src/shell/inputs/signals.rs b/src/shell/inputs/signals.rs new file mode 100644 index 0000000..4b91273 --- /dev/null +++ b/src/shell/inputs/signals.rs @@ -0,0 +1,30 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Result<Self> { + let signals = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::window_change(), + )?; + tokio::spawn(Self::task(signals, event_w)); + Ok(Self) + } + + async fn task( + mut signals: tokio::signal::unix::Signal, + event_w: crate::shell::event::Writer, + ) { + event_w.send(resize_event()); + while signals.recv().await.is_some() { + event_w.send(resize_event()); + } + } +} + +fn resize_event() -> Event { + Event::Resize(terminal_size::terminal_size().map_or( + (24, 80), + |(terminal_size::Width(w), terminal_size::Height(h))| (h, w), + )) +} diff --git a/src/shell/inputs/stdin.rs b/src/shell/inputs/stdin.rs new file mode 100644 index 0000000..b966307 --- /dev/null +++ b/src/shell/inputs/stdin.rs @@ -0,0 +1,17 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new( + mut input: textmode::blocking::Input, + event_w: crate::shell::event::Writer, + ) -> Self { + std::thread::spawn(move || { + while let Some(key) = input.read_key().unwrap() { + event_w.send(Event::Key(key)); + } + }); + Self + } +} diff --git a/src/shell/mod.rs b/src/shell/mod.rs index b23ab1e..0f42cde 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -1,11 +1,10 @@ use crate::shell::prelude::*; -use notify::Watcher as _; use textmode::Textmode as _; mod event; -mod git; mod history; +mod inputs; mod prelude; mod readline; @@ -20,135 +19,16 @@ pub async fn main() -> Result<i32> { let (event_w, event_r) = event::channel(); - { - let mut signals = tokio::signal::unix::signal( - tokio::signal::unix::SignalKind::window_change(), - )?; - let event_w = event_w.clone(); - tokio::task::spawn(async move { - event_w.send(Event::Resize( - terminal_size::terminal_size().map_or( - (24, 80), - |(terminal_size::Width(w), terminal_size::Height(h))| { - (h, w) - }, - ), - )); - while signals.recv().await.is_some() { - event_w.send(Event::Resize( - terminal_size::terminal_size().map_or( - (24, 80), - |( - terminal_size::Width(w), - terminal_size::Height(h), - )| { (h, w) }, - ), - )); - } - }); - } - - { - let event_w = event_w.clone(); - std::thread::spawn(move || { - while let Some(key) = input.read_key().unwrap() { - event_w.send(Event::Key(key)); - } - }); - } - - // redraw the clock every second - { - let event_w = event_w.clone(); - tokio::task::spawn(async move { - let now_clock = time::OffsetDateTime::now_utc(); - let now_instant = tokio::time::Instant::now(); - let mut interval = tokio::time::interval_at( - now_instant - + std::time::Duration::from_nanos( - 1_000_000_000_u64 - .saturating_sub(now_clock.nanosecond().into()), - ), - std::time::Duration::from_secs(1), - ); - loop { - interval.tick().await; - event_w.send(Event::ClockTimer); - } - }); - } - - let (git_w, mut git_r): ( - tokio::sync::mpsc::UnboundedSender<std::path::PathBuf>, - _, - ) = tokio::sync::mpsc::unbounded_channel(); - { - let event_w = event_w.clone(); - // clippy can't tell that we assign to this later - #[allow(clippy::no_effect_underscore_binding)] - let mut _active_watcher = None; - tokio::task::spawn(async move { - while let Some(mut dir) = git_r.recv().await { - while let Ok(newer_dir) = git_r.try_recv() { - dir = newer_dir; - } - let repo = git2::Repository::discover(&dir).ok(); - if repo.is_some() { - let (sync_watch_w, sync_watch_r) = - std::sync::mpsc::channel(); - let (watch_w, mut watch_r) = - tokio::sync::mpsc::unbounded_channel(); - let mut watcher = notify::RecommendedWatcher::new( - sync_watch_w, - std::time::Duration::from_millis(100), - ) - .unwrap(); - watcher - .watch(&dir, notify::RecursiveMode::Recursive) - .unwrap(); - tokio::task::spawn_blocking(move || { - while let Ok(event) = sync_watch_r.recv() { - let watch_w = watch_w.clone(); - let send_failed = watch_w.send(event).is_err(); - if send_failed { - break; - } - } - }); - let event_w = event_w.clone(); - tokio::task::spawn(async move { - while watch_r.recv().await.is_some() { - let repo = git2::Repository::discover(&dir).ok(); - let info = tokio::task::spawn_blocking(|| { - repo.map(|repo| git::Info::new(&repo)) - }) - .await - .unwrap(); - event_w.send(Event::GitInfo(info)); - } - }); - _active_watcher = Some(watcher); - } else { - _active_watcher = None; - } - let info = tokio::task::spawn_blocking(|| { - repo.map(|repo| git::Info::new(&repo)) - }) - .await - .unwrap(); - event_w.send(Event::GitInfo(info)); - } - }); - } + let inputs = inputs::Handler::new(input, event_w.clone()).unwrap(); let mut shell = Shell::new(crate::info::get_offset())?; let mut prev_dir = shell.env.pwd().to_path_buf(); - git_w.send(prev_dir.clone()).unwrap(); + inputs.new_dir(prev_dir.clone()); while let Some(event) = event_r.recv().await { let dir = shell.env().pwd(); if dir != prev_dir { prev_dir = dir.to_path_buf(); - git_w.send(dir.to_path_buf()).unwrap(); + inputs.new_dir(dir.to_path_buf()); } match shell.handle_event(event, &event_w) { Some(Action::Refresh) => { @@ -196,7 +76,7 @@ pub struct Shell { readline: readline::Readline, history: history::History, env: Env, - git: Option<git::Info>, + git: Option<inputs::GitInfo>, focus: Focus, scene: Scene, escape: bool, diff --git a/src/shell/readline.rs b/src/shell/readline.rs index 5de9901..654d264 100644 --- a/src/shell/readline.rs +++ b/src/shell/readline.rs @@ -23,7 +23,7 @@ impl Readline { &self, out: &mut impl textmode::Textmode, env: &Env, - git: Option<&super::git::Info>, + git: Option<&super::inputs::GitInfo>, focus: bool, offset: time::UtcOffset, ) -> Result<()> { |