From e1bfb9bc59a6a97594cb5c2c51cc4ca8ee813a23 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 4 Mar 2022 18:10:49 -0500 Subject: refactor inputs --- src/main.rs | 2 +- src/runner/mod.rs | 2 +- src/shell/event.rs | 6 +- src/shell/git.rs | 201 -------------------------------- src/shell/history/job.rs | 152 ++++++++++++------------ src/shell/history/pty.rs | 104 ++++++++--------- src/shell/inputs/clock.rs | 27 +++++ src/shell/inputs/git.rs | 277 ++++++++++++++++++++++++++++++++++++++++++++ src/shell/inputs/mod.rs | 32 +++++ src/shell/inputs/signals.rs | 30 +++++ src/shell/inputs/stdin.rs | 17 +++ src/shell/mod.rs | 130 +-------------------- src/shell/readline.rs | 2 +- 13 files changed, 524 insertions(+), 458 deletions(-) delete mode 100644 src/shell/git.rs create mode 100644 src/shell/inputs/clock.rs create mode 100644 src/shell/inputs/git.rs create mode 100644 src/shell/inputs/mod.rs create mode 100644 src/shell/inputs/signals.rs create mode 100644 src/shell/inputs/stdin.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 1ace4d7..b3e2fd5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,7 @@ async fn async_main(opt: Opt) -> Result { }) }); - return runner::run(command, &mut shell_write).await; + return runner::main(command, &mut shell_write).await; } shell::main().await diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 628b333..ea55b34 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -68,7 +68,7 @@ enum Frame { For(bool, usize, Vec), } -pub async fn run( +pub async fn main( commands: String, shell_write: &mut Option, ) -> Result { diff --git a/src/shell/event.rs b/src/shell/event.rs index 2b12b05..fe96d5b 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -8,7 +8,7 @@ pub enum Event { ChildRunPipeline(usize, (usize, usize)), ChildSuspend(usize), ChildExit(usize, Option), - GitInfo(Option), + GitInfo(Option), ClockTimer, } @@ -43,7 +43,7 @@ impl Reader { let inner = std::sync::Arc::new(InnerReader::new()); { let inner = inner.clone(); - tokio::task::spawn(async move { + tokio::spawn(async move { while let Some(event) = input.recv().await { inner.new_event(Some(event)); } @@ -95,7 +95,7 @@ struct Pending { child_run_pipeline: std::collections::VecDeque<(usize, (usize, usize))>, child_suspend: std::collections::VecDeque, child_exit: Option<(usize, Option)>, - git_info: Option>, + git_info: Option>, clock_timer: bool, done: bool, } diff --git a/src/shell/git.rs b/src/shell/git.rs deleted file mode 100644 index 48e5eea..0000000 --- a/src/shell/git.rs +++ /dev/null @@ -1,201 +0,0 @@ -#[derive(Debug)] -pub struct Info { - modified_files: bool, - staged_files: bool, - new_files: bool, - commits: bool, - active_operation: ActiveOperation, - branch: Option, - remote_branch_diff: Option<(usize, usize)>, -} - -const MODIFIED: git2::Status = git2::Status::WT_DELETED - .union(git2::Status::WT_MODIFIED) - .union(git2::Status::WT_RENAMED) - .union(git2::Status::WT_TYPECHANGE) - .union(git2::Status::CONFLICTED); -const STAGED: git2::Status = git2::Status::INDEX_DELETED - .union(git2::Status::INDEX_MODIFIED) - .union(git2::Status::INDEX_NEW) - .union(git2::Status::INDEX_RENAMED) - .union(git2::Status::INDEX_TYPECHANGE); -const NEW: git2::Status = git2::Status::WT_NEW; - -impl Info { - pub fn new(git: &git2::Repository) -> Self { - let mut status_options = git2::StatusOptions::new(); - status_options.include_untracked(true); - status_options.update_index(true); - - let statuses = git.statuses(Some(&mut status_options)); - - let mut modified_files = false; - let mut staged_files = false; - let mut new_files = false; - if let Ok(statuses) = statuses { - for file in statuses.iter() { - if file.status().intersects(MODIFIED) { - modified_files = true; - } - if file.status().intersects(STAGED) { - staged_files = true; - } - if file.status().intersects(NEW) { - new_files = true; - } - } - } - - let head = git.head(); - let mut commits = false; - let mut branch = None; - let mut remote_branch_diff = None; - - if let Ok(head) = head { - commits = true; - if head.is_branch() { - branch = head.shorthand().map(ToString::to_string); - remote_branch_diff = - head.resolve() - .ok() - .map(|head| { - ( - head.target(), - head.shorthand().map(ToString::to_string), - ) - }) - .and_then(|(head_id, name)| { - head_id.and_then(|head_id| { - name.and_then(|name| { - git.refname_to_id(&format!( - "refs/remotes/origin/{}", - name - )) - .ok() - .and_then(|remote_id| { - git.graph_ahead_behind( - head_id, remote_id, - ) - .ok() - }) - }) - }) - }); - } else { - branch = - head.resolve().ok().and_then(|head| head.target()).map( - |oid| { - let mut sha: String = oid - .as_bytes() - .iter() - .take(4) - .map(|b| format!("{:02x}", b)) - .collect(); - sha.truncate(7); - sha - }, - ); - } - } - - let active_operation = match git.state() { - git2::RepositoryState::Merge => ActiveOperation::Merge, - git2::RepositoryState::Revert - | git2::RepositoryState::RevertSequence => { - ActiveOperation::Revert - } - git2::RepositoryState::CherryPick - | git2::RepositoryState::CherryPickSequence => { - ActiveOperation::CherryPick - } - git2::RepositoryState::Bisect => ActiveOperation::Bisect, - git2::RepositoryState::Rebase - | git2::RepositoryState::RebaseInteractive - | git2::RepositoryState::RebaseMerge => ActiveOperation::Rebase, - _ => ActiveOperation::None, - }; - - Self { - modified_files, - staged_files, - new_files, - commits, - active_operation, - branch, - remote_branch_diff, - } - } -} - -impl std::fmt::Display for Info { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "g")?; - - if self.modified_files { - write!(f, "*")?; - } - if self.staged_files { - write!(f, "+")?; - } - if self.new_files { - write!(f, "?")?; - } - if !self.commits { - write!(f, "!")?; - return Ok(()); - } - - let branch = self.branch.as_ref().map_or("???", |branch| { - if branch == "master" { - "" - } else { - branch - } - }); - if !branch.is_empty() { - write!(f, ":")?; - } - write!(f, "{}", branch)?; - - if let Some((local, remote)) = self.remote_branch_diff { - if local > 0 || remote > 0 { - write!(f, ":")?; - } - if local > 0 { - write!(f, "+{}", local)?; - } - if remote > 0 { - write!(f, "-{}", remote)?; - } - } else { - write!(f, ":-")?; - } - - write!(f, "{}", self.active_operation)?; - - Ok(()) - } -} - -#[derive(Debug, Copy, Clone)] -pub enum ActiveOperation { - None, - Merge, - Revert, - CherryPick, - Bisect, - Rebase, -} - -impl std::fmt::Display for ActiveOperation { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ActiveOperation::None => Ok(()), - ActiveOperation::Merge => write!(f, "(m)"), - ActiveOperation::Revert => write!(f, "(v)"), - ActiveOperation::CherryPick => write!(f, "(c)"), - ActiveOperation::Bisect => write!(f, "(b)"), - ActiveOperation::Rebase => write!(f, "(r)"), - } - } -} diff --git a/src/shell/history/job.rs b/src/shell/history/job.rs index 365a06d..d3d112a 100644 --- a/src/shell/history/job.rs +++ b/src/shell/history/job.rs @@ -19,7 +19,7 @@ impl Job { let state = std::sync::Arc::new(std::sync::Mutex::new( State::Running((0, 0)), )); - tokio::task::spawn(job_task( + tokio::spawn(Self::task( child, fh, std::sync::Arc::clone(&state), @@ -66,6 +66,83 @@ impl Job { } }); } + + async fn task( + mut child: tokio::process::Child, + fh: std::fs::File, + state: std::sync::Arc>, + env: Env, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result), + } + + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() + == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; + } + } + }); + + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + let mut new_env = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { + crate::runner::Event::RunPipeline(new_span) => { + // we could just update the span in place here, but we + // do this as an event so that we can also trigger a + // refresh + event_w.send(Event::ChildRunPipeline( + env.idx(), + new_span, + )); + } + crate::runner::Event::Suspend => { + event_w.send(Event::ChildSuspend(env.idx())); + } + crate::runner::Event::Exit(env) => { + new_env = Some(env); + } + }, + Res::Exit(status) => { + exit_status = Some(status.unwrap()); + } + } + } + *state.lock().unwrap() = + State::Exited(ExitInfo::new(exit_status.unwrap())); + event_w.send(Event::ChildExit(env.idx(), new_env)); + } } pub enum State { @@ -108,79 +185,6 @@ impl ExitInfo { } } -async fn job_task( - mut child: tokio::process::Child, - fh: std::fs::File, - state: std::sync::Arc>, - env: Env, - event_w: crate::shell::event::Writer, -) { - enum Res { - Read(crate::runner::Event), - Exit(std::io::Result), - } - - let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); - tokio::task::spawn_blocking(move || loop { - let event = bincode::deserialize_from(&fh); - match event { - Ok(event) => { - read_w.send(event).unwrap(); - } - Err(e) => { - match &*e { - bincode::ErrorKind::Io(io_e) => { - assert!( - io_e.kind() == std::io::ErrorKind::UnexpectedEof - ); - } - e => { - panic!("{}", e); - } - } - break; - } - } - }); - - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) - .map(Res::Read) - .boxed(), - futures_util::stream::once(child.wait()) - .map(Res::Exit) - .boxed(), - ] - .into_iter() - .collect(); - let mut exit_status = None; - let mut new_env = None; - while let Some(res) = stream.next().await { - match res { - Res::Read(event) => match event { - crate::runner::Event::RunPipeline(new_span) => { - // we could just update the span in place here, but we do - // this as an event so that we can also trigger a refresh - event_w - .send(Event::ChildRunPipeline(env.idx(), new_span)); - } - crate::runner::Event::Suspend => { - event_w.send(Event::ChildSuspend(env.idx())); - } - crate::runner::Event::Exit(env) => { - new_env = Some(env); - } - }, - Res::Exit(status) => { - exit_status = Some(status.unwrap()); - } - } - } - *state.lock().unwrap() = - State::Exited(ExitInfo::new(exit_status.unwrap())); - event_w.send(Event::ChildExit(env.idx(), new_env)); -} - fn spawn_command( cmdline: &str, env: &Env, diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 49681d4..91b9cbb 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -26,7 +26,7 @@ impl Pty { super::pty::Vt::new(size), )); - tokio::task::spawn(pty_task( + tokio::spawn(Self::task( pty, std::sync::Arc::clone(&vt), request_r, @@ -66,6 +66,57 @@ impl Pty { #[allow(clippy::let_underscore_drop)] let _ = self.request_w.send(Request::Resize(size.0, size.1)); } + + async fn task( + pty: pty_process::Pty, + vt: std::sync::Arc>, + request_r: tokio::sync::mpsc::UnboundedReceiver, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(Result), + Request(Request), + } + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) + .map(Res::Request) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { + Res::Read(res) => match res { + Ok(bytes) => { + vt.lock().unwrap().process(&bytes); + event_w.send(Event::PtyOutput); + } + Err(e) => { + // this means that there are no longer any open pts + // fds. we could alternately signal this through an + // explicit channel at ChildExit time, but this seems + // reliable enough. + if e.raw_os_error() == Some(libc::EIO) { + return; + } + panic!("pty read failed: {:?}", e); + } + }, + Res::Request(Request::Input(bytes)) => { + pty_w.write(&bytes).await.unwrap(); + } + Res::Request(Request::Resize(row, col)) => { + pty_w.resize(pty_process::Size::new(row, col)).unwrap(); + vt.lock().unwrap().set_size((row, col)); + } + } + } + } } pub struct Vt { @@ -161,54 +212,3 @@ impl Vt { last_row } } - -async fn pty_task( - pty: pty_process::Pty, - vt: std::sync::Arc>, - request_r: tokio::sync::mpsc::UnboundedReceiver, - event_w: crate::shell::event::Writer, -) { - enum Res { - Read(Result), - Request(Request), - } - - let (pty_r, mut pty_w) = pty.into_split(); - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_util::io::ReaderStream::new(pty_r) - .map(Res::Read) - .boxed(), - tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) - .map(Res::Request) - .boxed(), - ] - .into_iter() - .collect(); - while let Some(res) = stream.next().await { - match res { - Res::Read(res) => match res { - Ok(bytes) => { - vt.lock().unwrap().process(&bytes); - event_w.send(Event::PtyOutput); - } - Err(e) => { - // this means that there are no longer any open pts fds. - // we could alternately signal this through an explicit - // channel at ChildExit time, but this seems reliable - // enough. - if e.raw_os_error() == Some(libc::EIO) { - return; - } - panic!("pty read failed: {:?}", e); - } - }, - Res::Request(Request::Input(bytes)) => { - pty_w.write(&bytes).await.unwrap(); - } - Res::Request(Request::Resize(row, col)) => { - pty_w.resize(pty_process::Size::new(row, col)).unwrap(); - vt.lock().unwrap().set_size((row, col)); - } - } - } -} diff --git a/src/shell/inputs/clock.rs b/src/shell/inputs/clock.rs new file mode 100644 index 0000000..250466e --- /dev/null +++ b/src/shell/inputs/clock.rs @@ -0,0 +1,27 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Self { + tokio::spawn(Self::task(event_w)); + Self + } + + async fn task(event_w: crate::shell::event::Writer) { + let now_clock = time::OffsetDateTime::now_utc(); + let now_instant = tokio::time::Instant::now(); + let mut interval = tokio::time::interval_at( + now_instant + + std::time::Duration::from_nanos( + 1_000_000_000_u64 + .saturating_sub(now_clock.nanosecond().into()), + ), + std::time::Duration::from_secs(1), + ); + loop { + interval.tick().await; + event_w.send(Event::ClockTimer); + } + } +} diff --git a/src/shell/inputs/git.rs b/src/shell/inputs/git.rs new file mode 100644 index 0000000..1c1c92d --- /dev/null +++ b/src/shell/inputs/git.rs @@ -0,0 +1,277 @@ +use crate::shell::prelude::*; + +use notify::Watcher as _; + +pub struct Handler { + git_w: tokio::sync::mpsc::UnboundedSender, +} + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Self { + let (git_w, git_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn(Self::task(git_r, event_w)); + Self { git_w } + } + + pub fn new_dir(&self, path: std::path::PathBuf) { + self.git_w.send(path).unwrap(); + } + + async fn task( + mut git_r: tokio::sync::mpsc::UnboundedReceiver, + event_w: crate::shell::event::Writer, + ) { + // clippy can't tell that we assign to this later + #[allow(clippy::no_effect_underscore_binding)] + let mut _active_watcher = None; + while let Some(mut dir) = git_r.recv().await { + while let Ok(newer_dir) = git_r.try_recv() { + dir = newer_dir; + } + let repo = git2::Repository::discover(&dir).ok(); + if repo.is_some() { + let (sync_watch_w, sync_watch_r) = std::sync::mpsc::channel(); + let (watch_w, mut watch_r) = + tokio::sync::mpsc::unbounded_channel(); + let mut watcher = notify::RecommendedWatcher::new( + sync_watch_w, + std::time::Duration::from_millis(100), + ) + .unwrap(); + watcher + .watch(&dir, notify::RecursiveMode::Recursive) + .unwrap(); + tokio::task::spawn_blocking(move || { + while let Ok(event) = sync_watch_r.recv() { + if watch_w.send(event).is_err() { + break; + } + } + }); + let event_w = event_w.clone(); + tokio::spawn(async move { + while watch_r.recv().await.is_some() { + let repo = git2::Repository::discover(&dir).ok(); + let info = tokio::task::spawn_blocking(|| { + repo.map(|repo| Info::new(&repo)) + }) + .await + .unwrap(); + event_w.send(Event::GitInfo(info)); + } + }); + _active_watcher = Some(watcher); + } else { + _active_watcher = None; + } + let info = tokio::task::spawn_blocking(|| { + repo.map(|repo| Info::new(&repo)) + }) + .await + .unwrap(); + event_w.send(Event::GitInfo(info)); + } + } +} + +#[derive(Debug)] +pub struct Info { + modified_files: bool, + staged_files: bool, + new_files: bool, + commits: bool, + active_operation: ActiveOperation, + branch: Option, + remote_branch_diff: Option<(usize, usize)>, +} + +const MODIFIED: git2::Status = git2::Status::WT_DELETED + .union(git2::Status::WT_MODIFIED) + .union(git2::Status::WT_RENAMED) + .union(git2::Status::WT_TYPECHANGE) + .union(git2::Status::CONFLICTED); +const STAGED: git2::Status = git2::Status::INDEX_DELETED + .union(git2::Status::INDEX_MODIFIED) + .union(git2::Status::INDEX_NEW) + .union(git2::Status::INDEX_RENAMED) + .union(git2::Status::INDEX_TYPECHANGE); +const NEW: git2::Status = git2::Status::WT_NEW; + +impl Info { + pub fn new(git: &git2::Repository) -> Self { + let mut status_options = git2::StatusOptions::new(); + status_options.include_untracked(true); + status_options.update_index(true); + + let statuses = git.statuses(Some(&mut status_options)); + + let mut modified_files = false; + let mut staged_files = false; + let mut new_files = false; + if let Ok(statuses) = statuses { + for file in statuses.iter() { + if file.status().intersects(MODIFIED) { + modified_files = true; + } + if file.status().intersects(STAGED) { + staged_files = true; + } + if file.status().intersects(NEW) { + new_files = true; + } + } + } + + let head = git.head(); + let mut commits = false; + let mut branch = None; + let mut remote_branch_diff = None; + + if let Ok(head) = head { + commits = true; + if head.is_branch() { + branch = head.shorthand().map(ToString::to_string); + remote_branch_diff = + head.resolve() + .ok() + .map(|head| { + ( + head.target(), + head.shorthand().map(ToString::to_string), + ) + }) + .and_then(|(head_id, name)| { + head_id.and_then(|head_id| { + name.and_then(|name| { + git.refname_to_id(&format!( + "refs/remotes/origin/{}", + name + )) + .ok() + .and_then(|remote_id| { + git.graph_ahead_behind( + head_id, remote_id, + ) + .ok() + }) + }) + }) + }); + } else { + branch = + head.resolve().ok().and_then(|head| head.target()).map( + |oid| { + let mut sha: String = oid + .as_bytes() + .iter() + .take(4) + .map(|b| format!("{:02x}", b)) + .collect(); + sha.truncate(7); + sha + }, + ); + } + } + + let active_operation = match git.state() { + git2::RepositoryState::Merge => ActiveOperation::Merge, + git2::RepositoryState::Revert + | git2::RepositoryState::RevertSequence => { + ActiveOperation::Revert + } + git2::RepositoryState::CherryPick + | git2::RepositoryState::CherryPickSequence => { + ActiveOperation::CherryPick + } + git2::RepositoryState::Bisect => ActiveOperation::Bisect, + git2::RepositoryState::Rebase + | git2::RepositoryState::RebaseInteractive + | git2::RepositoryState::RebaseMerge => ActiveOperation::Rebase, + _ => ActiveOperation::None, + }; + + Self { + modified_files, + staged_files, + new_files, + commits, + active_operation, + branch, + remote_branch_diff, + } + } +} + +impl std::fmt::Display for Info { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "g")?; + + if self.modified_files { + write!(f, "*")?; + } + if self.staged_files { + write!(f, "+")?; + } + if self.new_files { + write!(f, "?")?; + } + if !self.commits { + write!(f, "!")?; + return Ok(()); + } + + let branch = self.branch.as_ref().map_or("???", |branch| { + if branch == "master" { + "" + } else { + branch + } + }); + if !branch.is_empty() { + write!(f, ":")?; + } + write!(f, "{}", branch)?; + + if let Some((local, remote)) = self.remote_branch_diff { + if local > 0 || remote > 0 { + write!(f, ":")?; + } + if local > 0 { + write!(f, "+{}", local)?; + } + if remote > 0 { + write!(f, "-{}", remote)?; + } + } else { + write!(f, ":-")?; + } + + write!(f, "{}", self.active_operation)?; + + Ok(()) + } +} + +#[derive(Debug, Copy, Clone)] +pub enum ActiveOperation { + None, + Merge, + Revert, + CherryPick, + Bisect, + Rebase, +} + +impl std::fmt::Display for ActiveOperation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ActiveOperation::None => Ok(()), + ActiveOperation::Merge => write!(f, "(m)"), + ActiveOperation::Revert => write!(f, "(v)"), + ActiveOperation::CherryPick => write!(f, "(c)"), + ActiveOperation::Bisect => write!(f, "(b)"), + ActiveOperation::Rebase => write!(f, "(r)"), + } + } +} diff --git a/src/shell/inputs/mod.rs b/src/shell/inputs/mod.rs new file mode 100644 index 0000000..48590a2 --- /dev/null +++ b/src/shell/inputs/mod.rs @@ -0,0 +1,32 @@ +use crate::shell::prelude::*; + +mod clock; +mod git; +pub use git::Info as GitInfo; +mod signals; +mod stdin; + +pub struct Handler { + _clock: clock::Handler, + git: git::Handler, + _signals: signals::Handler, + _stdin: stdin::Handler, +} + +impl Handler { + pub fn new( + input: textmode::blocking::Input, + event_w: crate::shell::event::Writer, + ) -> Result { + Ok(Self { + _clock: clock::Handler::new(event_w.clone()), + git: git::Handler::new(event_w.clone()), + _signals: signals::Handler::new(event_w.clone())?, + _stdin: stdin::Handler::new(input, event_w), + }) + } + + pub fn new_dir(&self, path: std::path::PathBuf) { + self.git.new_dir(path); + } +} diff --git a/src/shell/inputs/signals.rs b/src/shell/inputs/signals.rs new file mode 100644 index 0000000..4b91273 --- /dev/null +++ b/src/shell/inputs/signals.rs @@ -0,0 +1,30 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new(event_w: crate::shell::event::Writer) -> Result { + let signals = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::window_change(), + )?; + tokio::spawn(Self::task(signals, event_w)); + Ok(Self) + } + + async fn task( + mut signals: tokio::signal::unix::Signal, + event_w: crate::shell::event::Writer, + ) { + event_w.send(resize_event()); + while signals.recv().await.is_some() { + event_w.send(resize_event()); + } + } +} + +fn resize_event() -> Event { + Event::Resize(terminal_size::terminal_size().map_or( + (24, 80), + |(terminal_size::Width(w), terminal_size::Height(h))| (h, w), + )) +} diff --git a/src/shell/inputs/stdin.rs b/src/shell/inputs/stdin.rs new file mode 100644 index 0000000..b966307 --- /dev/null +++ b/src/shell/inputs/stdin.rs @@ -0,0 +1,17 @@ +use crate::shell::prelude::*; + +pub struct Handler; + +impl Handler { + pub fn new( + mut input: textmode::blocking::Input, + event_w: crate::shell::event::Writer, + ) -> Self { + std::thread::spawn(move || { + while let Some(key) = input.read_key().unwrap() { + event_w.send(Event::Key(key)); + } + }); + Self + } +} diff --git a/src/shell/mod.rs b/src/shell/mod.rs index b23ab1e..0f42cde 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -1,11 +1,10 @@ use crate::shell::prelude::*; -use notify::Watcher as _; use textmode::Textmode as _; mod event; -mod git; mod history; +mod inputs; mod prelude; mod readline; @@ -20,135 +19,16 @@ pub async fn main() -> Result { let (event_w, event_r) = event::channel(); - { - let mut signals = tokio::signal::unix::signal( - tokio::signal::unix::SignalKind::window_change(), - )?; - let event_w = event_w.clone(); - tokio::task::spawn(async move { - event_w.send(Event::Resize( - terminal_size::terminal_size().map_or( - (24, 80), - |(terminal_size::Width(w), terminal_size::Height(h))| { - (h, w) - }, - ), - )); - while signals.recv().await.is_some() { - event_w.send(Event::Resize( - terminal_size::terminal_size().map_or( - (24, 80), - |( - terminal_size::Width(w), - terminal_size::Height(h), - )| { (h, w) }, - ), - )); - } - }); - } - - { - let event_w = event_w.clone(); - std::thread::spawn(move || { - while let Some(key) = input.read_key().unwrap() { - event_w.send(Event::Key(key)); - } - }); - } - - // redraw the clock every second - { - let event_w = event_w.clone(); - tokio::task::spawn(async move { - let now_clock = time::OffsetDateTime::now_utc(); - let now_instant = tokio::time::Instant::now(); - let mut interval = tokio::time::interval_at( - now_instant - + std::time::Duration::from_nanos( - 1_000_000_000_u64 - .saturating_sub(now_clock.nanosecond().into()), - ), - std::time::Duration::from_secs(1), - ); - loop { - interval.tick().await; - event_w.send(Event::ClockTimer); - } - }); - } - - let (git_w, mut git_r): ( - tokio::sync::mpsc::UnboundedSender, - _, - ) = tokio::sync::mpsc::unbounded_channel(); - { - let event_w = event_w.clone(); - // clippy can't tell that we assign to this later - #[allow(clippy::no_effect_underscore_binding)] - let mut _active_watcher = None; - tokio::task::spawn(async move { - while let Some(mut dir) = git_r.recv().await { - while let Ok(newer_dir) = git_r.try_recv() { - dir = newer_dir; - } - let repo = git2::Repository::discover(&dir).ok(); - if repo.is_some() { - let (sync_watch_w, sync_watch_r) = - std::sync::mpsc::channel(); - let (watch_w, mut watch_r) = - tokio::sync::mpsc::unbounded_channel(); - let mut watcher = notify::RecommendedWatcher::new( - sync_watch_w, - std::time::Duration::from_millis(100), - ) - .unwrap(); - watcher - .watch(&dir, notify::RecursiveMode::Recursive) - .unwrap(); - tokio::task::spawn_blocking(move || { - while let Ok(event) = sync_watch_r.recv() { - let watch_w = watch_w.clone(); - let send_failed = watch_w.send(event).is_err(); - if send_failed { - break; - } - } - }); - let event_w = event_w.clone(); - tokio::task::spawn(async move { - while watch_r.recv().await.is_some() { - let repo = git2::Repository::discover(&dir).ok(); - let info = tokio::task::spawn_blocking(|| { - repo.map(|repo| git::Info::new(&repo)) - }) - .await - .unwrap(); - event_w.send(Event::GitInfo(info)); - } - }); - _active_watcher = Some(watcher); - } else { - _active_watcher = None; - } - let info = tokio::task::spawn_blocking(|| { - repo.map(|repo| git::Info::new(&repo)) - }) - .await - .unwrap(); - event_w.send(Event::GitInfo(info)); - } - }); - } + let inputs = inputs::Handler::new(input, event_w.clone()).unwrap(); let mut shell = Shell::new(crate::info::get_offset())?; let mut prev_dir = shell.env.pwd().to_path_buf(); - git_w.send(prev_dir.clone()).unwrap(); + inputs.new_dir(prev_dir.clone()); while let Some(event) = event_r.recv().await { let dir = shell.env().pwd(); if dir != prev_dir { prev_dir = dir.to_path_buf(); - git_w.send(dir.to_path_buf()).unwrap(); + inputs.new_dir(dir.to_path_buf()); } match shell.handle_event(event, &event_w) { Some(Action::Refresh) => { @@ -196,7 +76,7 @@ pub struct Shell { readline: readline::Readline, history: history::History, env: Env, - git: Option, + git: Option, focus: Focus, scene: Scene, escape: bool, diff --git a/src/shell/readline.rs b/src/shell/readline.rs index 5de9901..654d264 100644 --- a/src/shell/readline.rs +++ b/src/shell/readline.rs @@ -23,7 +23,7 @@ impl Readline { &self, out: &mut impl textmode::Textmode, env: &Env, - git: Option<&super::git::Info>, + git: Option<&super::inputs::GitInfo>, focus: bool, offset: time::UtcOffset, ) -> Result<()> { -- cgit v1.2.3-54-g00ecf