From a2462bbaea13f7a3f3eb65e7430b30618bc203b8 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 25 Feb 2022 17:32:58 -0500 Subject: move to tokio --- src/shell/event.rs | 38 ++++++------ src/shell/history/entry.rs | 20 +++--- src/shell/history/mod.rs | 152 +++++++++++++++++++-------------------------- src/shell/history/pty.rs | 118 +++++++++++++++++------------------ src/shell/mod.rs | 115 +++++++++++++++++----------------- 5 files changed, 207 insertions(+), 236 deletions(-) (limited to 'src/shell') diff --git a/src/shell/event.rs b/src/shell/event.rs index 025f3c4..ad14705 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -11,22 +11,23 @@ pub enum Event { } pub struct Reader { - pending: async_std::sync::Mutex, - cvar: async_std::sync::Condvar, + pending: tokio::sync::Mutex, + cvar: tokio::sync::Notify, } impl Reader { pub fn new( - input: async_std::channel::Receiver, - ) -> async_std::sync::Arc { - let this = async_std::sync::Arc::new(Self { - pending: async_std::sync::Mutex::new(Pending::new()), - cvar: async_std::sync::Condvar::new(), - }); + mut input: tokio::sync::mpsc::UnboundedReceiver, + ) -> std::sync::Arc { + let this = Self { + pending: tokio::sync::Mutex::new(Pending::new()), + cvar: tokio::sync::Notify::new(), + }; + let this = std::sync::Arc::new(this); { - let this = async_std::sync::Arc::clone(&this); - async_std::task::spawn(async move { - while let Ok(event) = input.recv().await { + let this = this.clone(); + tokio::task::spawn(async move { + while let Some(event) = input.recv().await { this.new_event(Some(event)).await; } this.new_event(None).await; @@ -36,13 +37,14 @@ impl Reader { } pub async fn recv(&self) -> Option { - let mut pending = self - .cvar - .wait_until(self.pending.lock().await, |pending| { - pending.has_event() - }) - .await; - pending.get_event() + loop { + let mut pending = self.pending.lock().await; + if pending.has_event() { + return pending.get_event(); + } + drop(pending); + self.cvar.notified().await; + } } async fn new_event(&self, event: Option) { diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs index a45d99d..97e8a7b 100644 --- a/src/shell/history/entry.rs +++ b/src/shell/history/entry.rs @@ -16,8 +16,8 @@ pub struct Entry { visual_bell: bool, real_bell_pending: bool, fullscreen: Option, - input: async_std::channel::Sender>, - resize: async_std::channel::Sender<(u16, u16)>, + input: tokio::sync::mpsc::UnboundedSender>, + resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, start_time: time::OffsetDateTime, start_instant: std::time::Instant, } @@ -27,8 +27,8 @@ impl Entry { cmdline: String, env: Env, size: (u16, u16), - input: async_std::channel::Sender>, - resize: async_std::channel::Sender<(u16, u16)>, + input: tokio::sync::mpsc::UnboundedSender>, + resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, ) -> Self { let span = (0, cmdline.len()); Self { @@ -229,13 +229,13 @@ impl Entry { pub async fn send_input(&self, bytes: Vec) { if self.running() { - self.input.send(bytes).await.unwrap(); + self.input.send(bytes).unwrap(); } } pub async fn resize(&mut self, size: (u16, u16)) { if self.running() { - self.resize.send(size).await.unwrap(); + self.resize.send(size).unwrap(); self.vt.set_size(size.0, size.1); } } @@ -341,11 +341,11 @@ impl Entry { pub async fn finish( &mut self, env: Env, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) { self.state = State::Exited(ExitInfo::new(env.latest_status())); self.env = env; - event_w.send(Event::PtyClose).await.unwrap(); + event_w.send(Event::PtyClose).unwrap(); } fn exit_info(&self) -> Option<&ExitInfo> { @@ -369,12 +369,12 @@ impl Entry { } struct ExitInfo { - status: async_std::process::ExitStatus, + status: std::process::ExitStatus, instant: std::time::Instant, } impl ExitInfo { - fn new(status: async_std::process::ExitStatus) -> Self { + fn new(status: std::process::ExitStatus) -> Self { Self { status, instant: std::time::Instant::now(), diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 1bc4e62..2eeab0b 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -67,7 +67,7 @@ impl History { out: &mut impl textmode::Textmode, idx: usize, ) { - let mut entry = self.entries[idx].lock_arc().await; + let mut entry = self.entries[idx].clone().lock_owned().await; entry.render_fullscreen(out); } @@ -78,7 +78,7 @@ impl History { pub async fn resize(&mut self, size: (u16, u16)) { self.size = size; for entry in &self.entries { - entry.lock_arc().await.resize(size).await; + entry.clone().lock_owned().await.resize(size).await; } } @@ -86,10 +86,10 @@ impl History { &mut self, cmdline: &str, env: &Env, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> anyhow::Result { - let (input_w, input_r) = async_std::channel::unbounded(); - let (resize_w, resize_r) = async_std::channel::unbounded(); + let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel(); + let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel(); let entry = crate::mutex::new(Entry::new( cmdline.to_string(), @@ -112,7 +112,7 @@ impl History { } pub async fn entry(&self, idx: usize) -> crate::mutex::Guard { - self.entries[idx].lock_arc().await + self.entries[idx].clone().lock_owned().await } pub fn entry_count(&self) -> usize { @@ -173,7 +173,7 @@ impl History { for (idx, entry) in self.entries.iter().enumerate().rev().skip(self.scroll_pos) { - let entry = entry.lock_arc().await; + let entry = entry.clone().lock_owned().await; let focused = focus.map_or(false, |focus| idx == focus); used_lines += entry.lines(self.entry_count(), focused && !scrolling); @@ -221,13 +221,13 @@ fn run_commands( cmdline: String, entry: crate::mutex::Mutex, mut env: Env, - input_r: async_std::channel::Receiver>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender, + input_r: tokio::sync::mpsc::UnboundedReceiver>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender, ) { - async_std::task::spawn(async move { + tokio::task::spawn(async move { let pty = match pty::Pty::new( - entry.lock_arc().await.size(), + entry.clone().lock_owned().await.size(), &entry, input_r, resize_r, @@ -235,14 +235,12 @@ fn run_commands( ) { Ok(pty) => pty, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!("nbsh: failed to allocate pty: {}\r\n", e) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( - 1 << 8, - )); + env.set_status(std::process::ExitStatus::from_raw(1 << 8)); entry.finish(env, event_w).await; return; } @@ -254,7 +252,7 @@ fn run_commands( { Ok(status) => status, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!( "nbsh: failed to spawn {}: {}\r\n", @@ -262,7 +260,7 @@ fn run_commands( ) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( + env.set_status(std::process::ExitStatus::from_raw( 1 << 8, )); entry.finish(env, event_w).await; @@ -271,7 +269,7 @@ fn run_commands( }; env.set_status(status); - entry.lock_arc().await.finish(env, event_w).await; + entry.clone().lock_owned().await.finish(env, event_w).await; pty.close().await; }); } @@ -280,12 +278,19 @@ async fn spawn_commands( cmdline: &str, pty: &pty::Pty, env: &mut Env, - event_w: async_std::channel::Sender, -) -> anyhow::Result { + event_w: tokio::sync::mpsc::UnboundedSender, +) -> anyhow::Result { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result), + } + let mut cmd = pty_process::Command::new(std::env::current_exe()?); cmd.args(&["-c", cmdline, "--status-fd", "3"]); env.apply(&mut cmd); let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + // Safety: from_r was just opened above and is not used anywhere else + let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; // Safety: dup2 is an async-signal-safe function unsafe { cmd.pre_exec(move || { @@ -293,90 +298,63 @@ async fn spawn_commands( Ok(()) }); } - let child = pty.spawn(cmd)?; + let mut child = pty.spawn(cmd)?; nix::unistd::close(from_w)?; - let (read_w, read_r) = async_std::channel::unbounded(); - let new_read = move || { - let read_w = read_w.clone(); - async_std::task::spawn(async move { - let event = blocking::unblock(move || { - // Safety: from_r was just opened above and is only - // referenced in this closure, which takes ownership of it - // at the start and returns ownership of it at the end - let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; - let event = bincode::deserialize_from(&fh); - let _ = fh.into_raw_fd(); - event - }) - .await; - if read_w.is_closed() { - // we should never drop read_r while there are still valid - // things to read - assert!(event.is_err()); - } else { - read_w.send(event).await.unwrap(); + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; } - }); - }; - - new_read(); - let mut read_done = false; - let mut exit_done = None; - loop { - enum Res { - Read(bincode::Result), - Exit(std::io::Result), } + }); - let read_r = read_r.clone(); - let read = async move { Res::Read(read_r.recv().await.unwrap()) }; - let exit = async { - Res::Exit(if exit_done.is_none() { - child.status_no_drop().await - } else { - std::future::pending().await - }) - }; - match read.or(exit).await { - Res::Read(Ok(event)) => match event { + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { crate::runner::Event::RunPipeline(idx, span) => { - event_w - .send(Event::ChildRunPipeline(idx, span)) - .await - .unwrap(); - new_read(); + event_w.send(Event::ChildRunPipeline(idx, span)).unwrap(); } crate::runner::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)).await.unwrap(); - new_read(); + event_w.send(Event::ChildSuspend(idx)).unwrap(); } crate::runner::Event::Exit(new_env) => { *env = new_env; - read_done = true; } }, - Res::Read(Err(e)) => { - if let bincode::ErrorKind::Io(io_e) = &*e { - if io_e.kind() == std::io::ErrorKind::UnexpectedEof { - read_done = true; - } else { - anyhow::bail!(e); - } - } else { - anyhow::bail!(e); - } - } Res::Exit(Ok(status)) => { - exit_done = Some(status); + exit_status = Some(status); } Res::Exit(Err(e)) => { anyhow::bail!(e); } } - if let (true, Some(status)) = (read_done, exit_done) { - nix::unistd::close(from_r)?; - return Ok(status); - } } + Ok(exit_status.unwrap()) } diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 5a51e73..acfe500 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -1,26 +1,26 @@ use crate::shell::prelude::*; pub struct Pty { - pty: async_std::sync::Arc, - close_w: async_std::channel::Sender<()>, + pts: pty_process::Pts, + close_w: tokio::sync::mpsc::UnboundedSender<()>, } impl Pty { pub fn new( size: (u16, u16), entry: &crate::mutex::Mutex, - input_r: async_std::channel::Receiver>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender, + input_r: tokio::sync::mpsc::UnboundedReceiver>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> anyhow::Result { - let (close_w, close_r) = async_std::channel::unbounded(); + let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel(); let pty = pty_process::Pty::new()?; pty.resize(pty_process::Size::new(size.0, size.1))?; - let pty = async_std::sync::Arc::new(pty); + let pts = pty.pts()?; - async_std::task::spawn(pty_task( - async_std::sync::Arc::clone(&pty), + tokio::task::spawn(pty_task( + pty, crate::mutex::clone(entry), input_r, resize_r, @@ -28,80 +28,74 @@ impl Pty { event_w, )); - Ok(Self { pty, close_w }) + Ok(Self { pts, close_w }) } pub fn spawn( &self, mut cmd: pty_process::Command, - ) -> anyhow::Result { - Ok(cmd.spawn(&self.pty)?) + ) -> anyhow::Result { + Ok(cmd.spawn(&self.pts)?) } pub async fn close(&self) { - self.close_w.send(()).await.unwrap(); + self.close_w.send(()).unwrap(); } } async fn pty_task( - pty: async_std::sync::Arc, + pty: pty_process::Pty, entry: crate::mutex::Mutex, - input_r: async_std::channel::Receiver>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - close_r: async_std::channel::Receiver<()>, - event_w: async_std::channel::Sender, + input_r: tokio::sync::mpsc::UnboundedReceiver>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + close_r: tokio::sync::mpsc::UnboundedReceiver<()>, + event_w: tokio::sync::mpsc::UnboundedSender, ) { - loop { - enum Res { - Read(Result), - Write(Result, async_std::channel::RecvError>), - Resize(Result<(u16, u16), async_std::channel::RecvError>), - Close(Result<(), async_std::channel::RecvError>), - } - let mut buf = [0_u8; 4096]; - let read = async { Res::Read((&*pty).read(&mut buf).await) }; - let write = async { Res::Write(input_r.recv().await) }; - let resize = async { Res::Resize(resize_r.recv().await) }; - let close = async { Res::Close(close_r.recv().await) }; - match read.race(write).race(resize).or(close).await { + enum Res { + Read(Result), + Write(Vec), + Resize((u16, u16)), + Close(()), + } + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(input_r) + .map(Res::Write) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r) + .map(Res::Resize) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(close_r) + .map(Res::Close) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { Res::Read(res) => match res { Ok(bytes) => { - entry.lock_arc().await.process(&buf[..bytes]); - event_w.send(Event::PtyOutput).await.unwrap(); + entry.clone().lock_owned().await.process(&bytes); + event_w.send(Event::PtyOutput).unwrap(); } Err(e) => { - if e.raw_os_error() == Some(libc::EIO) { - continue; - } panic!("pty read failed: {:?}", e); } }, - Res::Write(res) => match res { - Ok(bytes) => { - (&*pty).write(&bytes).await.unwrap(); - } - Err(e) => { - panic!("failed to read from input channel: {}", e); - } - }, - Res::Resize(res) => match res { - Ok(size) => { - pty.resize(pty_process::Size::new(size.0, size.1)) - .unwrap(); - } - Err(e) => { - panic!("failed to read from resize channel: {}", e); - } - }, - Res::Close(res) => match res { - Ok(()) => { - event_w.send(Event::PtyClose).await.unwrap(); - return; - } - Err(e) => { - panic!("failed to read from close channel: {}", e); - } - }, + Res::Write(bytes) => { + pty_w.write(&bytes).await.unwrap(); + } + Res::Resize(size) => pty_w + .resize(pty_process::Size::new(size.0, size.1)) + .unwrap(), + Res::Close(()) => { + event_w.send(Event::PtyClose).unwrap(); + return; + } } } } diff --git a/src/shell/mod.rs b/src/shell/mod.rs index 9c4002b..82d2021 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -10,7 +10,7 @@ mod prelude; mod readline; pub async fn main() -> anyhow::Result { - let mut input = textmode::Input::new().await?; + let mut input = textmode::blocking::Input::new()?; let mut output = textmode::Output::new().await?; // avoid the guards getting stuck in a task that doesn't run to @@ -18,23 +18,23 @@ pub async fn main() -> anyhow::Result { let _input_guard = input.take_raw_guard(); let _output_guard = output.take_screen_guard(); - let (event_w, event_r) = async_std::channel::unbounded(); + let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel(); { - // nix::sys::signal::Signal is repr(i32) - #[allow(clippy::as_conversions)] - let signals = signal_hook_async_std::Signals::new(&[ - nix::sys::signal::Signal::SIGWINCH as i32, - ])?; + let mut signals = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::window_change(), + )?; let event_w = event_w.clone(); - async_std::task::spawn(async move { - // nix::sys::signal::Signal is repr(i32) - #[allow(clippy::as_conversions)] - let mut signals = async_std::stream::once( - nix::sys::signal::Signal::SIGWINCH as i32, - ) - .chain(signals); - while signals.next().await.is_some() { + tokio::task::spawn(async move { + event_w + .send(Event::Resize(terminal_size::terminal_size().map_or( + (24, 80), + |(terminal_size::Width(w), terminal_size::Height(h))| { + (h, w) + }, + ))) + .unwrap(); + while signals.recv().await.is_some() { event_w .send(Event::Resize( terminal_size::terminal_size().map_or( @@ -45,7 +45,6 @@ pub async fn main() -> anyhow::Result { )| { (h, w) }, ), )) - .await .unwrap(); } }); @@ -53,9 +52,9 @@ pub async fn main() -> anyhow::Result { { let event_w = event_w.clone(); - async_std::task::spawn(async move { - while let Some(key) = input.read_key().await.unwrap() { - event_w.send(Event::Key(key)).await.unwrap(); + std::thread::spawn(move || { + while let Some(key) = input.read_key().unwrap() { + event_w.send(Event::Key(key)).unwrap(); } }); } @@ -63,33 +62,35 @@ pub async fn main() -> anyhow::Result { // redraw the clock every second { let event_w = event_w.clone(); - async_std::task::spawn(async move { - let first_sleep = 1_000_000_000_u64.saturating_sub( - time::OffsetDateTime::now_utc().nanosecond().into(), - ); - async_std::task::sleep(std::time::Duration::from_nanos( - first_sleep, - )) - .await; - let mut interval = async_std::stream::interval( + tokio::task::spawn(async move { + let now_clock = time::OffsetDateTime::now_utc(); + let now_instant = tokio::time::Instant::now(); + let mut interval = tokio::time::interval_at( + now_instant + + std::time::Duration::from_nanos( + 1_000_000_000_u64 + .saturating_sub(now_clock.nanosecond().into()), + ), std::time::Duration::from_secs(1), ); - event_w.send(Event::ClockTimer).await.unwrap(); - while interval.next().await.is_some() { - event_w.send(Event::ClockTimer).await.unwrap(); + loop { + interval.tick().await; + event_w.send(Event::ClockTimer).unwrap(); } }); } - let (git_w, git_r): (async_std::channel::Sender, _) = - async_std::channel::unbounded(); + let (git_w, mut git_r): ( + tokio::sync::mpsc::UnboundedSender, + _, + ) = tokio::sync::mpsc::unbounded_channel(); { let event_w = event_w.clone(); // clippy can't tell that we assign to this later #[allow(clippy::no_effect_underscore_binding)] let mut _active_watcher = None; - async_std::task::spawn(async move { - while let Ok(mut dir) = git_r.recv().await { + tokio::task::spawn(async move { + while let Some(mut dir) = git_r.recv().await { while let Ok(newer_dir) = git_r.try_recv() { dir = newer_dir; } @@ -97,7 +98,8 @@ pub async fn main() -> anyhow::Result { if repo.is_some() { let (sync_watch_w, sync_watch_r) = std::sync::mpsc::channel(); - let (watch_w, watch_r) = async_std::channel::unbounded(); + let (watch_w, mut watch_r) = + tokio::sync::mpsc::unbounded_channel(); let mut watcher = notify::RecommendedWatcher::new( sync_watch_w, std::time::Duration::from_millis(100), @@ -106,31 +108,25 @@ pub async fn main() -> anyhow::Result { watcher .watch(&dir, notify::RecursiveMode::Recursive) .unwrap(); - async_std::task::spawn(blocking::unblock(move || { + tokio::task::spawn_blocking(move || { while let Ok(event) = sync_watch_r.recv() { let watch_w = watch_w.clone(); - let send_failed = - async_std::task::block_on(async move { - watch_w.send(event).await.is_err() - }); + let send_failed = watch_w.send(event).is_err(); if send_failed { break; } } - })); + }); let event_w = event_w.clone(); - async_std::task::spawn(async move { - while watch_r.recv().await.is_ok() { + tokio::task::spawn(async move { + while watch_r.recv().await.is_some() { let repo = git2::Repository::discover(&dir).ok(); - let info = blocking::unblock(|| { + let info = tokio::task::spawn_blocking(|| { repo.map(|repo| git::Info::new(&repo)) }) - .await; - if event_w - .send(Event::GitInfo(info)) - .await - .is_err() - { + .await + .unwrap(); + if event_w.send(Event::GitInfo(info)).is_err() { break; } } @@ -139,24 +135,25 @@ pub async fn main() -> anyhow::Result { } else { _active_watcher = None; } - let info = blocking::unblock(|| { + let info = tokio::task::spawn_blocking(|| { repo.map(|repo| git::Info::new(&repo)) }) - .await; - event_w.send(Event::GitInfo(info)).await.unwrap(); + .await + .unwrap(); + event_w.send(Event::GitInfo(info)).unwrap(); } }); } let mut shell = Shell::new(crate::info::get_offset())?; let mut prev_dir = shell.env.pwd().to_path_buf(); - git_w.send(prev_dir.clone()).await.unwrap(); + git_w.send(prev_dir.clone()).unwrap(); let event_reader = event::Reader::new(event_r); while let Some(event) = event_reader.recv().await { let dir = shell.env().pwd(); if dir != prev_dir { prev_dir = dir.to_path_buf(); - git_w.send(dir.to_path_buf()).await.unwrap(); + git_w.send(dir.to_path_buf()).unwrap(); } match shell.handle_event(event, &event_w).await { Some(Action::Refresh) => { @@ -322,7 +319,7 @@ impl Shell { pub async fn handle_event( &mut self, event: Event, - event_w: &async_std::channel::Sender, + event_w: &tokio::sync::mpsc::UnboundedSender, ) -> Option { match event { Event::Key(key) => { @@ -405,7 +402,7 @@ impl Shell { async fn handle_key_escape( &mut self, key: textmode::Key, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> Option { match key { textmode::Key::Ctrl(b'd') => { @@ -514,7 +511,7 @@ impl Shell { async fn handle_key_readline( &mut self, key: textmode::Key, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> Option { match key { textmode::Key::Char(c) => { -- cgit v1.2.3-54-g00ecf