summaryrefslogtreecommitdiffstats
path: root/src/shell
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2022-02-25 17:32:58 -0500
committerJesse Luehrs <doy@tozt.net>2022-02-25 17:32:58 -0500
commita2462bbaea13f7a3f3eb65e7430b30618bc203b8 (patch)
tree8cb2eab6904c2c9f4138f8221caa947c6b805d3b /src/shell
parent7b66471194490a1421001fd51d073cc6d18848ea (diff)
downloadnbsh-a2462bbaea13f7a3f3eb65e7430b30618bc203b8.tar.gz
nbsh-a2462bbaea13f7a3f3eb65e7430b30618bc203b8.zip
move to tokio
Diffstat (limited to 'src/shell')
-rw-r--r--src/shell/event.rs38
-rw-r--r--src/shell/history/entry.rs20
-rw-r--r--src/shell/history/mod.rs152
-rw-r--r--src/shell/history/pty.rs118
-rw-r--r--src/shell/mod.rs115
5 files changed, 207 insertions, 236 deletions
diff --git a/src/shell/event.rs b/src/shell/event.rs
index 025f3c4..ad14705 100644
--- a/src/shell/event.rs
+++ b/src/shell/event.rs
@@ -11,22 +11,23 @@ pub enum Event {
}
pub struct Reader {
- pending: async_std::sync::Mutex<Pending>,
- cvar: async_std::sync::Condvar,
+ pending: tokio::sync::Mutex<Pending>,
+ cvar: tokio::sync::Notify,
}
impl Reader {
pub fn new(
- input: async_std::channel::Receiver<Event>,
- ) -> async_std::sync::Arc<Self> {
- let this = async_std::sync::Arc::new(Self {
- pending: async_std::sync::Mutex::new(Pending::new()),
- cvar: async_std::sync::Condvar::new(),
- });
+ mut input: tokio::sync::mpsc::UnboundedReceiver<Event>,
+ ) -> std::sync::Arc<Self> {
+ let this = Self {
+ pending: tokio::sync::Mutex::new(Pending::new()),
+ cvar: tokio::sync::Notify::new(),
+ };
+ let this = std::sync::Arc::new(this);
{
- let this = async_std::sync::Arc::clone(&this);
- async_std::task::spawn(async move {
- while let Ok(event) = input.recv().await {
+ let this = this.clone();
+ tokio::task::spawn(async move {
+ while let Some(event) = input.recv().await {
this.new_event(Some(event)).await;
}
this.new_event(None).await;
@@ -36,13 +37,14 @@ impl Reader {
}
pub async fn recv(&self) -> Option<Event> {
- let mut pending = self
- .cvar
- .wait_until(self.pending.lock().await, |pending| {
- pending.has_event()
- })
- .await;
- pending.get_event()
+ loop {
+ let mut pending = self.pending.lock().await;
+ if pending.has_event() {
+ return pending.get_event();
+ }
+ drop(pending);
+ self.cvar.notified().await;
+ }
}
async fn new_event(&self, event: Option<Event>) {
diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs
index a45d99d..97e8a7b 100644
--- a/src/shell/history/entry.rs
+++ b/src/shell/history/entry.rs
@@ -16,8 +16,8 @@ pub struct Entry {
visual_bell: bool,
real_bell_pending: bool,
fullscreen: Option<bool>,
- input: async_std::channel::Sender<Vec<u8>>,
- resize: async_std::channel::Sender<(u16, u16)>,
+ input: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
+ resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>,
start_time: time::OffsetDateTime,
start_instant: std::time::Instant,
}
@@ -27,8 +27,8 @@ impl Entry {
cmdline: String,
env: Env,
size: (u16, u16),
- input: async_std::channel::Sender<Vec<u8>>,
- resize: async_std::channel::Sender<(u16, u16)>,
+ input: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
+ resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>,
) -> Self {
let span = (0, cmdline.len());
Self {
@@ -229,13 +229,13 @@ impl Entry {
pub async fn send_input(&self, bytes: Vec<u8>) {
if self.running() {
- self.input.send(bytes).await.unwrap();
+ self.input.send(bytes).unwrap();
}
}
pub async fn resize(&mut self, size: (u16, u16)) {
if self.running() {
- self.resize.send(size).await.unwrap();
+ self.resize.send(size).unwrap();
self.vt.set_size(size.0, size.1);
}
}
@@ -341,11 +341,11 @@ impl Entry {
pub async fn finish(
&mut self,
env: Env,
- event_w: async_std::channel::Sender<Event>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) {
self.state = State::Exited(ExitInfo::new(env.latest_status()));
self.env = env;
- event_w.send(Event::PtyClose).await.unwrap();
+ event_w.send(Event::PtyClose).unwrap();
}
fn exit_info(&self) -> Option<&ExitInfo> {
@@ -369,12 +369,12 @@ impl Entry {
}
struct ExitInfo {
- status: async_std::process::ExitStatus,
+ status: std::process::ExitStatus,
instant: std::time::Instant,
}
impl ExitInfo {
- fn new(status: async_std::process::ExitStatus) -> Self {
+ fn new(status: std::process::ExitStatus) -> Self {
Self {
status,
instant: std::time::Instant::now(),
diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs
index 1bc4e62..2eeab0b 100644
--- a/src/shell/history/mod.rs
+++ b/src/shell/history/mod.rs
@@ -67,7 +67,7 @@ impl History {
out: &mut impl textmode::Textmode,
idx: usize,
) {
- let mut entry = self.entries[idx].lock_arc().await;
+ let mut entry = self.entries[idx].clone().lock_owned().await;
entry.render_fullscreen(out);
}
@@ -78,7 +78,7 @@ impl History {
pub async fn resize(&mut self, size: (u16, u16)) {
self.size = size;
for entry in &self.entries {
- entry.lock_arc().await.resize(size).await;
+ entry.clone().lock_owned().await.resize(size).await;
}
}
@@ -86,10 +86,10 @@ impl History {
&mut self,
cmdline: &str,
env: &Env,
- event_w: async_std::channel::Sender<Event>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) -> anyhow::Result<usize> {
- let (input_w, input_r) = async_std::channel::unbounded();
- let (resize_w, resize_r) = async_std::channel::unbounded();
+ let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel();
+ let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel();
let entry = crate::mutex::new(Entry::new(
cmdline.to_string(),
@@ -112,7 +112,7 @@ impl History {
}
pub async fn entry(&self, idx: usize) -> crate::mutex::Guard<Entry> {
- self.entries[idx].lock_arc().await
+ self.entries[idx].clone().lock_owned().await
}
pub fn entry_count(&self) -> usize {
@@ -173,7 +173,7 @@ impl History {
for (idx, entry) in
self.entries.iter().enumerate().rev().skip(self.scroll_pos)
{
- let entry = entry.lock_arc().await;
+ let entry = entry.clone().lock_owned().await;
let focused = focus.map_or(false, |focus| idx == focus);
used_lines +=
entry.lines(self.entry_count(), focused && !scrolling);
@@ -221,13 +221,13 @@ fn run_commands(
cmdline: String,
entry: crate::mutex::Mutex<Entry>,
mut env: Env,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<Event>,
+ input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
+ resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) {
- async_std::task::spawn(async move {
+ tokio::task::spawn(async move {
let pty = match pty::Pty::new(
- entry.lock_arc().await.size(),
+ entry.clone().lock_owned().await.size(),
&entry,
input_r,
resize_r,
@@ -235,14 +235,12 @@ fn run_commands(
) {
Ok(pty) => pty,
Err(e) => {
- let mut entry = entry.lock_arc().await;
+ let mut entry = entry.clone().lock_owned().await;
entry.process(
format!("nbsh: failed to allocate pty: {}\r\n", e)
.as_bytes(),
);
- env.set_status(async_std::process::ExitStatus::from_raw(
- 1 << 8,
- ));
+ env.set_status(std::process::ExitStatus::from_raw(1 << 8));
entry.finish(env, event_w).await;
return;
}
@@ -254,7 +252,7 @@ fn run_commands(
{
Ok(status) => status,
Err(e) => {
- let mut entry = entry.lock_arc().await;
+ let mut entry = entry.clone().lock_owned().await;
entry.process(
format!(
"nbsh: failed to spawn {}: {}\r\n",
@@ -262,7 +260,7 @@ fn run_commands(
)
.as_bytes(),
);
- env.set_status(async_std::process::ExitStatus::from_raw(
+ env.set_status(std::process::ExitStatus::from_raw(
1 << 8,
));
entry.finish(env, event_w).await;
@@ -271,7 +269,7 @@ fn run_commands(
};
env.set_status(status);
- entry.lock_arc().await.finish(env, event_w).await;
+ entry.clone().lock_owned().await.finish(env, event_w).await;
pty.close().await;
});
}
@@ -280,12 +278,19 @@ async fn spawn_commands(
cmdline: &str,
pty: &pty::Pty,
env: &mut Env,
- event_w: async_std::channel::Sender<Event>,
-) -> anyhow::Result<async_std::process::ExitStatus> {
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+) -> anyhow::Result<std::process::ExitStatus> {
+ enum Res {
+ Read(crate::runner::Event),
+ Exit(std::io::Result<std::process::ExitStatus>),
+ }
+
let mut cmd = pty_process::Command::new(std::env::current_exe()?);
cmd.args(&["-c", cmdline, "--status-fd", "3"]);
env.apply(&mut cmd);
let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
+ // Safety: from_r was just opened above and is not used anywhere else
+ let fh = unsafe { std::fs::File::from_raw_fd(from_r) };
// Safety: dup2 is an async-signal-safe function
unsafe {
cmd.pre_exec(move || {
@@ -293,90 +298,63 @@ async fn spawn_commands(
Ok(())
});
}
- let child = pty.spawn(cmd)?;
+ let mut child = pty.spawn(cmd)?;
nix::unistd::close(from_w)?;
- let (read_w, read_r) = async_std::channel::unbounded();
- let new_read = move || {
- let read_w = read_w.clone();
- async_std::task::spawn(async move {
- let event = blocking::unblock(move || {
- // Safety: from_r was just opened above and is only
- // referenced in this closure, which takes ownership of it
- // at the start and returns ownership of it at the end
- let fh = unsafe { std::fs::File::from_raw_fd(from_r) };
- let event = bincode::deserialize_from(&fh);
- let _ = fh.into_raw_fd();
- event
- })
- .await;
- if read_w.is_closed() {
- // we should never drop read_r while there are still valid
- // things to read
- assert!(event.is_err());
- } else {
- read_w.send(event).await.unwrap();
+ let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel();
+ tokio::task::spawn_blocking(move || loop {
+ let event = bincode::deserialize_from(&fh);
+ match event {
+ Ok(event) => {
+ read_w.send(event).unwrap();
+ }
+ Err(e) => {
+ match &*e {
+ bincode::ErrorKind::Io(io_e) => {
+ assert!(
+ io_e.kind() == std::io::ErrorKind::UnexpectedEof
+ );
+ }
+ e => {
+ panic!("{}", e);
+ }
+ }
+ break;
}
- });
- };
-
- new_read();
- let mut read_done = false;
- let mut exit_done = None;
- loop {
- enum Res {
- Read(bincode::Result<crate::runner::Event>),
- Exit(std::io::Result<std::process::ExitStatus>),
}
+ });
- let read_r = read_r.clone();
- let read = async move { Res::Read(read_r.recv().await.unwrap()) };
- let exit = async {
- Res::Exit(if exit_done.is_none() {
- child.status_no_drop().await
- } else {
- std::future::pending().await
- })
- };
- match read.or(exit).await {
- Res::Read(Ok(event)) => match event {
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_stream::wrappers::UnboundedReceiverStream::new(read_r)
+ .map(Res::Read)
+ .boxed(),
+ futures_util::stream::once(child.wait())
+ .map(Res::Exit)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+ let mut exit_status = None;
+ while let Some(res) = stream.next().await {
+ match res {
+ Res::Read(event) => match event {
crate::runner::Event::RunPipeline(idx, span) => {
- event_w
- .send(Event::ChildRunPipeline(idx, span))
- .await
- .unwrap();
- new_read();
+ event_w.send(Event::ChildRunPipeline(idx, span)).unwrap();
}
crate::runner::Event::Suspend(idx) => {
- event_w.send(Event::ChildSuspend(idx)).await.unwrap();
- new_read();
+ event_w.send(Event::ChildSuspend(idx)).unwrap();
}
crate::runner::Event::Exit(new_env) => {
*env = new_env;
- read_done = true;
}
},
- Res::Read(Err(e)) => {
- if let bincode::ErrorKind::Io(io_e) = &*e {
- if io_e.kind() == std::io::ErrorKind::UnexpectedEof {
- read_done = true;
- } else {
- anyhow::bail!(e);
- }
- } else {
- anyhow::bail!(e);
- }
- }
Res::Exit(Ok(status)) => {
- exit_done = Some(status);
+ exit_status = Some(status);
}
Res::Exit(Err(e)) => {
anyhow::bail!(e);
}
}
- if let (true, Some(status)) = (read_done, exit_done) {
- nix::unistd::close(from_r)?;
- return Ok(status);
- }
}
+ Ok(exit_status.unwrap())
}
diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs
index 5a51e73..acfe500 100644
--- a/src/shell/history/pty.rs
+++ b/src/shell/history/pty.rs
@@ -1,26 +1,26 @@
use crate::shell::prelude::*;
pub struct Pty {
- pty: async_std::sync::Arc<pty_process::Pty>,
- close_w: async_std::channel::Sender<()>,
+ pts: pty_process::Pts,
+ close_w: tokio::sync::mpsc::UnboundedSender<()>,
}
impl Pty {
pub fn new(
size: (u16, u16),
entry: &crate::mutex::Mutex<super::Entry>,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<Event>,
+ input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
+ resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) -> anyhow::Result<Self> {
- let (close_w, close_r) = async_std::channel::unbounded();
+ let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel();
let pty = pty_process::Pty::new()?;
pty.resize(pty_process::Size::new(size.0, size.1))?;
- let pty = async_std::sync::Arc::new(pty);
+ let pts = pty.pts()?;
- async_std::task::spawn(pty_task(
- async_std::sync::Arc::clone(&pty),
+ tokio::task::spawn(pty_task(
+ pty,
crate::mutex::clone(entry),
input_r,
resize_r,
@@ -28,80 +28,74 @@ impl Pty {
event_w,
));
- Ok(Self { pty, close_w })
+ Ok(Self { pts, close_w })
}
pub fn spawn(
&self,
mut cmd: pty_process::Command,
- ) -> anyhow::Result<async_std::process::Child> {
- Ok(cmd.spawn(&self.pty)?)
+ ) -> anyhow::Result<tokio::process::Child> {
+ Ok(cmd.spawn(&self.pts)?)
}
pub async fn close(&self) {
- self.close_w.send(()).await.unwrap();
+ self.close_w.send(()).unwrap();
}
}
async fn pty_task(
- pty: async_std::sync::Arc<pty_process::Pty>,
+ pty: pty_process::Pty,
entry: crate::mutex::Mutex<super::Entry>,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- close_r: async_std::channel::Receiver<()>,
- event_w: async_std::channel::Sender<Event>,
+ input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
+ resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
+ close_r: tokio::sync::mpsc::UnboundedReceiver<()>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) {
- loop {
- enum Res {
- Read(Result<usize, std::io::Error>),
- Write(Result<Vec<u8>, async_std::channel::RecvError>),
- Resize(Result<(u16, u16), async_std::channel::RecvError>),
- Close(Result<(), async_std::channel::RecvError>),
- }
- let mut buf = [0_u8; 4096];
- let read = async { Res::Read((&*pty).read(&mut buf).await) };
- let write = async { Res::Write(input_r.recv().await) };
- let resize = async { Res::Resize(resize_r.recv().await) };
- let close = async { Res::Close(close_r.recv().await) };
- match read.race(write).race(resize).or(close).await {
+ enum Res {
+ Read(Result<bytes::Bytes, std::io::Error>),
+ Write(Vec<u8>),
+ Resize((u16, u16)),
+ Close(()),
+ }
+
+ let (pty_r, mut pty_w) = pty.into_split();
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_util::io::ReaderStream::new(pty_r)
+ .map(Res::Read)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(input_r)
+ .map(Res::Write)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r)
+ .map(Res::Resize)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(close_r)
+ .map(Res::Close)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+ while let Some(res) = stream.next().await {
+ match res {
Res::Read(res) => match res {
Ok(bytes) => {
- entry.lock_arc().await.process(&buf[..bytes]);
- event_w.send(Event::PtyOutput).await.unwrap();
+ entry.clone().lock_owned().await.process(&bytes);
+ event_w.send(Event::PtyOutput).unwrap();
}
Err(e) => {
- if e.raw_os_error() == Some(libc::EIO) {
- continue;
- }
panic!("pty read failed: {:?}", e);
}
},
- Res::Write(res) => match res {
- Ok(bytes) => {
- (&*pty).write(&bytes).await.unwrap();
- }
- Err(e) => {
- panic!("failed to read from input channel: {}", e);
- }
- },
- Res::Resize(res) => match res {
- Ok(size) => {
- pty.resize(pty_process::Size::new(size.0, size.1))
- .unwrap();
- }
- Err(e) => {
- panic!("failed to read from resize channel: {}", e);
- }
- },
- Res::Close(res) => match res {
- Ok(()) => {
- event_w.send(Event::PtyClose).await.unwrap();
- return;
- }
- Err(e) => {
- panic!("failed to read from close channel: {}", e);
- }
- },
+ Res::Write(bytes) => {
+ pty_w.write(&bytes).await.unwrap();
+ }
+ Res::Resize(size) => pty_w
+ .resize(pty_process::Size::new(size.0, size.1))
+ .unwrap(),
+ Res::Close(()) => {
+ event_w.send(Event::PtyClose).unwrap();
+ return;
+ }
}
}
}
diff --git a/src/shell/mod.rs b/src/shell/mod.rs
index 9c4002b..82d2021 100644
--- a/src/shell/mod.rs
+++ b/src/shell/mod.rs
@@ -10,7 +10,7 @@ mod prelude;
mod readline;
pub async fn main() -> anyhow::Result<i32> {
- let mut input = textmode::Input::new().await?;
+ let mut input = textmode::blocking::Input::new()?;
let mut output = textmode::Output::new().await?;
// avoid the guards getting stuck in a task that doesn't run to
@@ -18,23 +18,23 @@ pub async fn main() -> anyhow::Result<i32> {
let _input_guard = input.take_raw_guard();
let _output_guard = output.take_screen_guard();
- let (event_w, event_r) = async_std::channel::unbounded();
+ let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel();
{
- // nix::sys::signal::Signal is repr(i32)
- #[allow(clippy::as_conversions)]
- let signals = signal_hook_async_std::Signals::new(&[
- nix::sys::signal::Signal::SIGWINCH as i32,
- ])?;
+ let mut signals = tokio::signal::unix::signal(
+ tokio::signal::unix::SignalKind::window_change(),
+ )?;
let event_w = event_w.clone();
- async_std::task::spawn(async move {
- // nix::sys::signal::Signal is repr(i32)
- #[allow(clippy::as_conversions)]
- let mut signals = async_std::stream::once(
- nix::sys::signal::Signal::SIGWINCH as i32,
- )
- .chain(signals);
- while signals.next().await.is_some() {
+ tokio::task::spawn(async move {
+ event_w
+ .send(Event::Resize(terminal_size::terminal_size().map_or(
+ (24, 80),
+ |(terminal_size::Width(w), terminal_size::Height(h))| {
+ (h, w)
+ },
+ )))
+ .unwrap();
+ while signals.recv().await.is_some() {
event_w
.send(Event::Resize(
terminal_size::terminal_size().map_or(
@@ -45,7 +45,6 @@ pub async fn main() -> anyhow::Result<i32> {
)| { (h, w) },
),
))
- .await
.unwrap();
}
});
@@ -53,9 +52,9 @@ pub async fn main() -> anyhow::Result<i32> {
{
let event_w = event_w.clone();
- async_std::task::spawn(async move {
- while let Some(key) = input.read_key().await.unwrap() {
- event_w.send(Event::Key(key)).await.unwrap();
+ std::thread::spawn(move || {
+ while let Some(key) = input.read_key().unwrap() {
+ event_w.send(Event::Key(key)).unwrap();
}
});
}
@@ -63,33 +62,35 @@ pub async fn main() -> anyhow::Result<i32> {
// redraw the clock every second
{
let event_w = event_w.clone();
- async_std::task::spawn(async move {
- let first_sleep = 1_000_000_000_u64.saturating_sub(
- time::OffsetDateTime::now_utc().nanosecond().into(),
- );
- async_std::task::sleep(std::time::Duration::from_nanos(
- first_sleep,
- ))
- .await;
- let mut interval = async_std::stream::interval(
+ tokio::task::spawn(async move {
+ let now_clock = time::OffsetDateTime::now_utc();
+ let now_instant = tokio::time::Instant::now();
+ let mut interval = tokio::time::interval_at(
+ now_instant
+ + std::time::Duration::from_nanos(
+ 1_000_000_000_u64
+ .saturating_sub(now_clock.nanosecond().into()),
+ ),
std::time::Duration::from_secs(1),
);
- event_w.send(Event::ClockTimer).await.unwrap();
- while interval.next().await.is_some() {
- event_w.send(Event::ClockTimer).await.unwrap();
+ loop {
+ interval.tick().await;
+ event_w.send(Event::ClockTimer).unwrap();
}
});
}
- let (git_w, git_r): (async_std::channel::Sender<std::path::PathBuf>, _) =
- async_std::channel::unbounded();
+ let (git_w, mut git_r): (
+ tokio::sync::mpsc::UnboundedSender<std::path::PathBuf>,
+ _,
+ ) = tokio::sync::mpsc::unbounded_channel();
{
let event_w = event_w.clone();
// clippy can't tell that we assign to this later
#[allow(clippy::no_effect_underscore_binding)]
let mut _active_watcher = None;
- async_std::task::spawn(async move {
- while let Ok(mut dir) = git_r.recv().await {
+ tokio::task::spawn(async move {
+ while let Some(mut dir) = git_r.recv().await {
while let Ok(newer_dir) = git_r.try_recv() {
dir = newer_dir;
}
@@ -97,7 +98,8 @@ pub async fn main() -> anyhow::Result<i32> {
if repo.is_some() {
let (sync_watch_w, sync_watch_r) =
std::sync::mpsc::channel();
- let (watch_w, watch_r) = async_std::channel::unbounded();
+ let (watch_w, mut watch_r) =
+ tokio::sync::mpsc::unbounded_channel();
let mut watcher = notify::RecommendedWatcher::new(
sync_watch_w,
std::time::Duration::from_millis(100),
@@ -106,31 +108,25 @@ pub async fn main() -> anyhow::Result<i32> {
watcher
.watch(&dir, notify::RecursiveMode::Recursive)
.unwrap();
- async_std::task::spawn(blocking::unblock(move || {
+ tokio::task::spawn_blocking(move || {
while let Ok(event) = sync_watch_r.recv() {
let watch_w = watch_w.clone();
- let send_failed =
- async_std::task::block_on(async move {
- watch_w.send(event).await.is_err()
- });
+ let send_failed = watch_w.send(event).is_err();
if send_failed {
break;
}
}
- }));
+ });
let event_w = event_w.clone();
- async_std::task::spawn(async move {
- while watch_r.recv().await.is_ok() {
+ tokio::task::spawn(async move {
+ while watch_r.recv().await.is_some() {
let repo = git2::Repository::discover(&dir).ok();
- let info = blocking::unblock(|| {
+ let info = tokio::task::spawn_blocking(|| {
repo.map(|repo| git::Info::new(&repo))
})
- .await;
- if event_w
- .send(Event::GitInfo(info))
- .await
- .is_err()
- {
+ .await
+ .unwrap();
+ if event_w.send(Event::GitInfo(info)).is_err() {
break;
}
}
@@ -139,24 +135,25 @@ pub async fn main() -> anyhow::Result<i32> {
} else {
_active_watcher = None;
}
- let info = blocking::unblock(|| {
+ let info = tokio::task::spawn_blocking(|| {
repo.map(|repo| git::Info::new(&repo))
})
- .await;
- event_w.send(Event::GitInfo(info)).await.unwrap();
+ .await
+ .unwrap();
+ event_w.send(Event::GitInfo(info)).unwrap();
}
});
}
let mut shell = Shell::new(crate::info::get_offset())?;
let mut prev_dir = shell.env.pwd().to_path_buf();
- git_w.send(prev_dir.clone()).await.unwrap();
+ git_w.send(prev_dir.clone()).unwrap();
let event_reader = event::Reader::new(event_r);
while let Some(event) = event_reader.recv().await {
let dir = shell.env().pwd();
if dir != prev_dir {
prev_dir = dir.to_path_buf();
- git_w.send(dir.to_path_buf()).await.unwrap();
+ git_w.send(dir.to_path_buf()).unwrap();
}
match shell.handle_event(event, &event_w).await {
Some(Action::Refresh) => {
@@ -322,7 +319,7 @@ impl Shell {
pub async fn handle_event(
&mut self,
event: Event,
- event_w: &async_std::channel::Sender<Event>,
+ event_w: &tokio::sync::mpsc::UnboundedSender<Event>,
) -> Option<Action> {
match event {
Event::Key(key) => {
@@ -405,7 +402,7 @@ impl Shell {
async fn handle_key_escape(
&mut self,
key: textmode::Key,
- event_w: async_std::channel::Sender<Event>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) -> Option<Action> {
match key {
textmode::Key::Ctrl(b'd') => {
@@ -514,7 +511,7 @@ impl Shell {
async fn handle_key_readline(
&mut self,
key: textmode::Key,
- event_w: async_std::channel::Sender<Event>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) -> Option<Action> {
match key {
textmode::Key::Char(c) => {