From 5396669e46f1306d930e5d92748eeb302fccaa9a Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 24 Feb 2022 05:32:52 -0500 Subject: move to tokio --- src/bin/ttyplay/event.rs | 46 ++++++----- src/bin/ttyplay/frames.rs | 39 ++++----- src/bin/ttyplay/input.rs | 17 ++-- src/bin/ttyplay/main.rs | 19 ++--- src/bin/ttyplay/timer.rs | 199 +++++++++++++++++++++++----------------------- src/bin/ttyrec/main.rs | 91 ++++++++++----------- 6 files changed, 198 insertions(+), 213 deletions(-) (limited to 'src') diff --git a/src/bin/ttyplay/event.rs b/src/bin/ttyplay/event.rs index 534b761..0be477e 100644 --- a/src/bin/ttyplay/event.rs +++ b/src/bin/ttyplay/event.rs @@ -1,3 +1,4 @@ +#[derive(Debug)] pub enum Event { FrameTransition((usize, Box)), FrameLoaded(Option), @@ -13,6 +14,7 @@ pub enum Event { Quit, } +#[derive(Debug)] pub enum TimerAction { Pause, FirstFrame, @@ -27,23 +29,26 @@ pub enum TimerAction { } struct Reader { - pending: async_std::sync::Mutex, - cvar: async_std::sync::Condvar, + pending: tokio::sync::Mutex, + // XXX not quite a condvar, but i think close enough as long as there is + // only ever one consumer? see + // https://github.com/tokio-rs/tokio/issues/3892 + cvar: tokio::sync::Notify, } impl Reader { fn new( - input: async_std::channel::Receiver, - ) -> async_std::sync::Arc { + mut input: tokio::sync::mpsc::UnboundedReceiver, + ) -> std::sync::Arc { let this = Self { - pending: async_std::sync::Mutex::new(Pending::new()), - cvar: async_std::sync::Condvar::new(), + pending: tokio::sync::Mutex::new(Pending::new()), + cvar: tokio::sync::Notify::new(), }; - let this = async_std::sync::Arc::new(this); + let this = std::sync::Arc::new(this); { let this = this.clone(); - async_std::task::spawn(async move { - while let Ok(event) = input.recv().await { + tokio::task::spawn(async move { + while let Some(event) = input.recv().await { this.event(event).await; } this.event(Event::Quit).await; @@ -53,13 +58,14 @@ impl Reader { } async fn read(&self) -> Option { - let mut pending = self - .cvar - .wait_until(self.pending.lock().await, |pending| { - pending.has_event() - }) - .await; - pending.get_event() + loop { + let mut pending = self.pending.lock().await; + if pending.has_event() { + return pending.get_event(); + } + drop(pending); + self.cvar.notified().await; + } } async fn event(&self, event: Event) { @@ -197,8 +203,8 @@ impl Pending { } pub async fn handle_events( - event_r: async_std::channel::Receiver, - timer_w: async_std::channel::Sender, + event_r: tokio::sync::mpsc::UnboundedReceiver, + timer_w: tokio::sync::mpsc::UnboundedSender, mut output: textmode::Output, ) -> anyhow::Result<()> { let mut display = crate::display::Display::new(); @@ -206,7 +212,7 @@ pub async fn handle_events( while let Some(event) = events.read().await { match event { Event::TimerAction(action) => { - timer_w.send(action).await?; + timer_w.send(action)?; continue; } Event::FrameTransition((idx, screen)) => { @@ -240,7 +246,7 @@ pub async fn handle_events( } Event::RunSearch(s, backwards) => { display.clear_search(); - timer_w.send(TimerAction::Search(s, backwards)).await?; + timer_w.send(TimerAction::Search(s, backwards))?; } Event::Error(e) => { return Err(e); diff --git a/src/bin/ttyplay/frames.rs b/src/bin/ttyplay/frames.rs index d6073dc..1cf2d01 100644 --- a/src/bin/ttyplay/frames.rs +++ b/src/bin/ttyplay/frames.rs @@ -21,13 +21,13 @@ impl Frame { pub struct FrameData { frames: Vec, done_reading: bool, - new_frame_w: async_std::channel::Sender>, - new_frame_r: async_std::channel::Receiver>, + new_frame_w: tokio::sync::watch::Sender>, + new_frame_r: tokio::sync::watch::Receiver>, } impl FrameData { pub fn new() -> Self { - let (new_frame_w, new_frame_r) = async_std::channel::unbounded(); + let (new_frame_w, new_frame_r) = tokio::sync::watch::channel(Some(0)); Self { frames: vec![], done_reading: false, @@ -72,7 +72,6 @@ impl FrameData { self.frames.push(frame); self.new_frame_w .send(Some(self.frames.len())) - .await // new_frame_w is never closed, so this can never fail .unwrap(); } @@ -81,7 +80,6 @@ impl FrameData { self.done_reading = true; self.new_frame_w .send(None) - .await // new_frame_w is never closed, so this can never fail .unwrap(); } @@ -98,16 +96,15 @@ impl FrameData { if self.done_reading { return Box::pin(std::future::ready(false)); } - let new_frame_r = self.new_frame_r.clone(); + let mut new_frame_r = self.new_frame_r.clone(); Box::pin(async move { - while let Some(new_len) = new_frame_r - .recv() - .await - // new_frame_r is never closed, so this can never fail - .unwrap() - { - if i < new_len { - return true; + while new_frame_r.changed().await.is_ok() { + if let Some(new_len) = *new_frame_r.borrow() { + if i < new_len { + return true; + } + } else { + break; } } false @@ -116,13 +113,13 @@ impl FrameData { } pub fn load_from_file( - frames: async_std::sync::Arc>, - fh: async_std::fs::File, - event_w: async_std::channel::Sender, + frames: std::sync::Arc>, + fh: tokio::fs::File, + event_w: tokio::sync::mpsc::UnboundedSender, clamp: Option, ) { let clamp = clamp.map(std::time::Duration::from_millis); - async_std::task::spawn(async move { + tokio::task::spawn(async move { let mut reader = ttyrec::Reader::new(fh); let size = terminal_size::terminal_size().map_or( (24, 80), @@ -144,21 +141,19 @@ pub fn load_from_file( } } parser.process(&frame.data); - let mut frames = frames.lock_arc().await; + let mut frames = frames.clone().lock_owned().await; frames .add_frame(Frame::new(parser.screen().clone(), delay)) .await; event_w .send(crate::event::Event::FrameLoaded(Some(frames.count()))) - .await // event_w is never closed, so this can never fail .unwrap(); prev_delay = delay; } - frames.lock_arc().await.done_reading().await; + frames.lock_owned().await.done_reading().await; event_w .send(crate::event::Event::FrameLoaded(None)) - .await // event_w is never closed, so this can never fail .unwrap(); }); diff --git a/src/bin/ttyplay/input.rs b/src/bin/ttyplay/input.rs index 81a2fef..fe7dc30 100644 --- a/src/bin/ttyplay/input.rs +++ b/src/bin/ttyplay/input.rs @@ -1,18 +1,17 @@ -pub fn spawn_task( - event_w: async_std::channel::Sender, - mut input: textmode::Input, +pub fn spawn_thread( + event_w: tokio::sync::mpsc::UnboundedSender, + mut input: textmode::blocking::Input, ) { - async_std::task::spawn(async move { + std::thread::spawn(move || { let mut search: Option = None; let mut prev_search = None; loop { - let key = match input.read_key().await { + let key = match input.read_key() { Ok(Some(key)) => key, Ok(None) => break, Err(e) => { event_w .send(crate::event::Event::Error(anyhow::anyhow!(e))) - .await // event_w is never closed, so this can never fail .unwrap(); break; @@ -26,7 +25,6 @@ pub fn spawn_task( .send(crate::event::Event::ActiveSearch( search_contents.clone(), )) - .await // event_w is never closed, so this can never fail .unwrap(); } @@ -36,7 +34,6 @@ pub fn spawn_task( .send(crate::event::Event::ActiveSearch( search_contents.clone(), )) - .await // event_w is never closed, so this can never fail .unwrap(); } @@ -46,7 +43,6 @@ pub fn spawn_task( search_contents.clone(), false, )) - .await // event_w is never closed, so this can never fail .unwrap(); prev_search = search; @@ -55,7 +51,6 @@ pub fn spawn_task( textmode::Key::Escape => { event_w .send(crate::event::Event::CancelSearch) - .await // event_w is never closed, so this can never fail .unwrap(); search = None; @@ -138,7 +133,7 @@ pub fn spawn_task( _ => continue, }; // event_w is never closed, so this can never fail - event_w.send(event).await.unwrap(); + event_w.send(event).unwrap(); } } }); diff --git a/src/bin/ttyplay/main.rs b/src/bin/ttyplay/main.rs index 499c7b9..832b870 100644 --- a/src/bin/ttyplay/main.rs +++ b/src/bin/ttyplay/main.rs @@ -56,6 +56,7 @@ struct Opt { speed: u32, } +#[tokio::main] async fn async_main(opt: Opt) -> anyhow::Result<()> { let Opt { file, @@ -66,19 +67,19 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { let speed = speed.clamp(0, 8); - let fh = async_std::fs::File::open(file).await?; + let fh = tokio::fs::File::open(file).await?; - let mut input = textmode::Input::new().await?; + let mut input = textmode::blocking::Input::new()?; let mut output = textmode::Output::new().await?; let _input_guard = input.take_raw_guard(); let _output_guard = output.take_screen_guard(); - let (event_w, event_r) = async_std::channel::unbounded(); - let (timer_w, timer_r) = async_std::channel::unbounded(); + let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel(); + let (timer_w, timer_r) = tokio::sync::mpsc::unbounded_channel(); - input::spawn_task(event_w.clone(), input); + input::spawn_thread(event_w.clone(), input); - let frame_data = async_std::sync::Arc::new(async_std::sync::Mutex::new( + let frame_data = std::sync::Arc::new(tokio::sync::Mutex::new( frames::FrameData::new(), )); frames::load_from_file(frame_data.clone(), fh, event_w.clone(), clamp); @@ -93,15 +94,15 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { event::handle_events(event_r, timer_w.clone(), output).await?; - timer_w.send(event::TimerAction::Quit).await?; - timer_task.await; + timer_w.send(event::TimerAction::Quit)?; + timer_task.await?; Ok(()) } #[paw::main] fn main(opt: Opt) { - match async_std::task::block_on(async_main(opt)) { + match async_main(opt) { Ok(_) => (), Err(e) => { eprintln!("ttyplay: {}", e); diff --git a/src/bin/ttyplay/timer.rs b/src/bin/ttyplay/timer.rs index 12ff17c..0b918aa 100644 --- a/src/bin/ttyplay/timer.rs +++ b/src/bin/ttyplay/timer.rs @@ -1,21 +1,18 @@ -use async_std::prelude::FutureExt as _; - pub fn spawn_task( - event_w: async_std::channel::Sender, - frames: async_std::sync::Arc< - async_std::sync::Mutex, + event_w: tokio::sync::mpsc::UnboundedSender, + frames: std::sync::Arc>, + mut timer_r: tokio::sync::mpsc::UnboundedReceiver< + crate::event::TimerAction, >, - timer_r: async_std::channel::Receiver, pause_at_start: bool, speed: u32, -) -> async_std::task::JoinHandle<()> { - async_std::task::spawn(async move { +) -> tokio::task::JoinHandle<()> { + tokio::task::spawn(async move { let mut idx = 0; let mut start_time = std::time::Instant::now(); let mut paused_time = if pause_at_start { event_w .send(crate::event::Event::Paused(true)) - .await // event_w is never closed, so this can never fail .unwrap(); Some(start_time) @@ -25,20 +22,17 @@ pub fn spawn_task( let mut force_update_time = false; let mut playback_ratio = 2_u32.pow(speed); loop { - enum Res { - Wait(Option>), - TimerAction( - Result< - crate::event::TimerAction, - async_std::channel::RecvError, - >, - ), - } let wait = async { - let wait_read = frames.lock_arc().await.wait_for_frame(idx); + let wait_read = + frames.clone().lock_owned().await.wait_for_frame(idx); if wait_read.await { - let frame = - frames.lock_arc().await.get(idx).unwrap().clone(); + let frame = frames + .clone() + .lock_owned() + .await + .get(idx) + .unwrap() + .clone(); if force_update_time { let now = std::time::Instant::now(); start_time = now - frame.delay() * playback_ratio / 16 @@ -54,7 +48,7 @@ pub fn spawn_task( } else if paused_time.is_some() { std::future::pending::<()>().await; } else { - async_std::task::sleep( + tokio::time::sleep( (start_time + frame.delay() * playback_ratio / 16) .saturating_duration_since( @@ -63,117 +57,120 @@ pub fn spawn_task( ) .await; } - Res::Wait(Some(Box::new(frame.into_screen()))) + Some(Box::new(frame.into_screen())) } else { - Res::Wait(None) + None } }; - let action = async { Res::TimerAction(timer_r.recv().await) }; - match wait.race(action).await { - Res::Wait(Some(screen)) => { + tokio::select! { + screen = wait => if let Some(screen) = screen { event_w .send(crate::event::Event::FrameTransition(( idx, screen, ))) - .await // event_w is never closed, so this can never fail .unwrap(); idx += 1; } - Res::Wait(None) => { - idx = frames.lock_arc().await.count() - 1; + else { + idx = frames.clone().lock_owned().await.count() - 1; paused_time = Some(std::time::Instant::now()); event_w .send(crate::event::Event::Paused(true)) - .await // event_w is never closed, so this can never fail .unwrap(); - } - Res::TimerAction(Ok(action)) => match action { - crate::event::TimerAction::Pause => { - let now = std::time::Instant::now(); - if let Some(time) = paused_time.take() { - start_time += now - time; - } else { - paused_time = Some(now); - } - event_w - .send(crate::event::Event::Paused( - paused_time.is_some(), - )) - .await - // event_w is never closed, so this can never fail - .unwrap(); - } - crate::event::TimerAction::FirstFrame => { - idx = 0; - force_update_time = true; - } - crate::event::TimerAction::LastFrame => { - idx = frames.lock_arc().await.count() - 1; - force_update_time = true; - } - // force_update_time will immediately transition to the - // next frame and do idx += 1 on its own - crate::event::TimerAction::NextFrame => { - force_update_time = true; - } - crate::event::TimerAction::PreviousFrame => { - idx = idx.saturating_sub(2); - force_update_time = true; - } - crate::event::TimerAction::SpeedUp => { - if playback_ratio > 1 { - playback_ratio /= 2; + }, + action = timer_r.recv() => match action { + Some(action) => match action { + crate::event::TimerAction::Pause => { let now = std::time::Instant::now(); - start_time = now - (now - start_time) / 2; + if let Some(time) = paused_time.take() { + start_time += now - time; + } else { + paused_time = Some(now); + } event_w - .send(crate::event::Event::Speed( - playback_ratio, + .send(crate::event::Event::Paused( + paused_time.is_some(), )) - .await // event_w is never closed, so this can never // fail .unwrap(); } - } - crate::event::TimerAction::SlowDown => { - if playback_ratio < 256 { - playback_ratio *= 2; + crate::event::TimerAction::FirstFrame => { + idx = 0; + force_update_time = true; + } + crate::event::TimerAction::LastFrame => { + idx = + frames.clone().lock_owned().await.count() - 1; + force_update_time = true; + } + // force_update_time will immediately transition to the + // next frame and do idx += 1 on its own + crate::event::TimerAction::NextFrame => { + force_update_time = true; + } + crate::event::TimerAction::PreviousFrame => { + idx = idx.saturating_sub(2); + force_update_time = true; + } + crate::event::TimerAction::SpeedUp => { + if playback_ratio > 1 { + playback_ratio /= 2; + let now = std::time::Instant::now(); + start_time = now - (now - start_time) / 2; + event_w + .send(crate::event::Event::Speed( + playback_ratio, + )) + // event_w is never closed, so this can + // never fail + .unwrap(); + } + } + crate::event::TimerAction::SlowDown => { + if playback_ratio < 256 { + playback_ratio *= 2; + let now = std::time::Instant::now(); + start_time = now - (now - start_time) * 2; + event_w + .send(crate::event::Event::Speed( + playback_ratio, + )) + // event_w is never closed, so this can + // never fail + .unwrap(); + } + } + crate::event::TimerAction::DefaultSpeed => { let now = std::time::Instant::now(); - start_time = now - (now - start_time) * 2; + start_time = now + - (((now - start_time) * 16) / playback_ratio); + playback_ratio = 16; event_w - .send(crate::event::Event::Speed( - playback_ratio, - )) - .await + .send( + crate::event::Event::Speed(playback_ratio) + ) // event_w is never closed, so this can never // fail .unwrap(); } - } - crate::event::TimerAction::DefaultSpeed => { - let now = std::time::Instant::now(); - start_time = now - - (((now - start_time) * 16) / playback_ratio); - playback_ratio = 16; - event_w - .send(crate::event::Event::Speed(playback_ratio)) - .await - // event_w is never closed, so this can never fail - .unwrap(); - } - crate::event::TimerAction::Search(s, backwards) => { - if let Some(new_idx) = - frames.lock_arc().await.search(idx, &s, backwards) - { - idx = new_idx; - force_update_time = true; + crate::event::TimerAction::Search(s, backwards) => { + if let Some(new_idx) = + frames.clone() + .lock_owned() + .await + .search(idx, &s, backwards) + { + idx = new_idx; + force_update_time = true; + } } + crate::event::TimerAction::Quit => break, } - crate::event::TimerAction::Quit => break, + None => unreachable!(), }, - Res::TimerAction(Err(e)) => panic!("{}", e), } } }) diff --git a/src/bin/ttyrec/main.rs b/src/bin/ttyrec/main.rs index 8278998..1dbba54 100644 --- a/src/bin/ttyrec/main.rs +++ b/src/bin/ttyrec/main.rs @@ -11,10 +11,7 @@ #![allow(clippy::too_many_lines)] #![allow(clippy::type_complexity)] -use async_std::io::{ReadExt as _, WriteExt as _}; -use async_std::prelude::FutureExt as _; -use async_std::stream::StreamExt as _; -use pty_process::Command as _; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; #[derive(Debug, structopt::StructOpt)] #[structopt( @@ -55,42 +52,46 @@ fn get_cmd( ) } +#[derive(Debug)] enum Event { Key(textmode::Result>), Stdout(std::io::Result>), Resize((u16, u16)), Error(anyhow::Error), + Quit, } +#[tokio::main] async fn async_main(opt: Opt) -> anyhow::Result<()> { let Opt { cmd, file } = opt; let (cmd, args) = get_cmd(cmd); - let fh = async_std::fs::File::create(file).await?; + let fh = tokio::fs::File::create(file).await?; - let mut input = textmode::Input::new().await?; + let mut input = textmode::blocking::Input::new()?; let _input_guard = input.take_raw_guard(); - let mut stdout = async_std::io::stdout(); + let mut stdout = tokio::io::stdout(); let size = terminal_size::terminal_size().map_or( (24, 80), |(terminal_size::Width(w), terminal_size::Height(h))| (h, w), ); - let child = async_std::process::Command::new(cmd) - .args(args) - .spawn_pty(Some(&pty_process::Size::new(size.0, size.1)))?; + let mut pty = pty_process::Pty::new()?; + pty.resize(pty_process::Size::new(size.0, size.1))?; + let pts = pty.pts()?; + let mut child = pty_process::Command::new(cmd).args(args).spawn(&pts)?; - let (event_w, event_r) = async_std::channel::unbounded(); - let (input_w, input_r) = async_std::channel::unbounded(); - let (resize_w, resize_r) = async_std::channel::unbounded(); + let (event_w, mut event_r) = tokio::sync::mpsc::unbounded_channel(); + let (input_w, mut input_r) = tokio::sync::mpsc::unbounded_channel(); + let (resize_w, mut resize_r) = tokio::sync::mpsc::unbounded_channel(); { - let mut signals = signal_hook_async_std::Signals::new(&[ - signal_hook::consts::signal::SIGWINCH, - ])?; + let mut signals = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::window_change(), + )?; let event_w = event_w.clone(); - async_std::task::spawn(async move { - while signals.next().await.is_some() { + tokio::task::spawn(async move { + while signals.recv().await.is_some() { event_w .send(Event::Resize( terminal_size::terminal_size().map_or( @@ -101,7 +102,6 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { )| { (h, w) }, ), )) - .await // event_w is never closed, so this can never fail .unwrap(); } @@ -110,11 +110,10 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { { let event_w = event_w.clone(); - async_std::task::spawn(async move { + std::thread::spawn(move || { loop { event_w - .send(Event::Key(input.read_key().await)) - .await + .send(Event::Key(input.read_key())) // event_w is never closed, so this can never fail .unwrap(); } @@ -123,57 +122,50 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { { let event_w = event_w.clone(); - async_std::task::spawn(async move { + tokio::task::spawn(async move { loop { - enum Res { - Read(Result), - Write(Result, async_std::channel::RecvError>), - Resize(Result<(u16, u16), async_std::channel::RecvError>), - } let mut buf = [0_u8; 4096]; - let mut pty = child.pty(); - let read = async { Res::Read(pty.read(&mut buf).await) }; - let write = async { Res::Write(input_r.recv().await) }; - let resize = async { Res::Resize(resize_r.recv().await) }; - match read.race(write).race(resize).await { - Res::Read(res) => { + tokio::select! { + res = pty.read(&mut buf) => { let res = res.map(|n| buf[..n].to_vec()); let err = res.is_err(); event_w .send(Event::Stdout(res)) - .await // event_w is never closed, so this can never fail .unwrap(); if err { + eprintln!("pty read failed: {}", err); break; } } - Res::Write(res) => { + res = input_r.recv() => { // input_r is never closed, so this can never fail - let bytes = res.unwrap(); + let bytes: Vec = res.unwrap(); if let Err(e) = pty.write(&bytes).await { event_w .send(Event::Error(anyhow::anyhow!(e))) - .await // event_w is never closed, so this can never // fail .unwrap(); } } - Res::Resize(res) => { + res = resize_r.recv() => { // resize_r is never closed, so this can never fail - let size = res.unwrap(); - if let Err(e) = child.resize_pty( - &pty_process::Size::new(size.0, size.1), + let size: (u16, u16) = res.unwrap(); + if let Err(e) = pty.resize( + pty_process::Size::new(size.0, size.1), ) { event_w .send(Event::Error(anyhow::anyhow!(e))) - .await // event_w is never closed, so this can never // fail .unwrap(); } } + _ = child.wait() => { + event_w.send(Event::Quit).unwrap(); + break; + } } } }); @@ -181,11 +173,12 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { let mut writer = ttyrec::Writer::new(fh); loop { - match event_r.recv().await? { + // XXX unwrap + match event_r.recv().await.unwrap() { Event::Key(key) => { let key = key?; if let Some(key) = key { - input_w.send(key.into_bytes()).await?; + input_w.send(key.into_bytes()).unwrap(); } else { break; } @@ -197,18 +190,16 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { stdout.flush().await?; } Err(e) => { - if e.raw_os_error() == Some(libc::EIO) { - break; - } anyhow::bail!("failed to read from child process: {}", e); } }, Event::Resize((h, w)) => { - resize_w.send((h, w)).await?; + resize_w.send((h, w)).unwrap(); } Event::Error(e) => { return Err(e); } + Event::Quit => break, } } @@ -217,7 +208,7 @@ async fn async_main(opt: Opt) -> anyhow::Result<()> { #[paw::main] fn main(opt: Opt) { - match async_std::task::block_on(async_main(opt)) { + match async_main(opt) { Ok(_) => (), Err(e) => { eprintln!("ttyrec: {}", e); -- cgit v1.2.3-54-g00ecf