From 5033f122cf47437767d9e9f4e8b2f7c0c37aa316 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Mon, 4 Nov 2019 14:06:39 -0500 Subject: refactor to read the entire ttyrec into memory --- src/cmd/play.rs | 222 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 122 insertions(+), 100 deletions(-) diff --git a/src/cmd/play.rs b/src/cmd/play.rs index 427f611..e4cf9c1 100644 --- a/src/cmd/play.rs +++ b/src/cmd/play.rs @@ -51,6 +51,118 @@ pub fn config( Ok(Box::new(config)) } +struct Frame { + dur: std::time::Duration, + data: Vec, +} + +impl Frame { + fn adjusted_dur( + &self, + scale: f32, + clamp: Option, + ) -> std::time::Duration { + let scaled = self.dur.div_f32(scale); + clamp.map_or(scaled, |clamp| scaled.min(clamp)) + } +} + +#[derive(Default)] +struct Ttyrec { + frames: Vec, +} + +impl Ttyrec { + fn new() -> Self { + Self::default() + } + + fn add_frame(&mut self, frame: Frame) { + self.frames.push(frame); + } + + fn frame(&self, idx: usize) -> Option<&Frame> { + self.frames.get(idx) + } +} + +struct Player { + playback_ratio: f32, + max_frame_length: Option, + ttyrec: Ttyrec, + idx: usize, + timer: Option, + base_time: std::time::Instant, + played_amount: std::time::Duration, +} + +impl Player { + fn new( + playback_ratio: f32, + max_frame_length: Option, + ) -> Self { + Self { + playback_ratio, + max_frame_length, + ttyrec: Ttyrec::new(), + idx: 0, + timer: None, + base_time: std::time::Instant::now(), + played_amount: std::time::Duration::default(), + } + } + + fn base_time_incr(&mut self, incr: std::time::Duration) { + self.base_time += incr; + self.set_timer(); + } + + fn add_frame(&mut self, frame: Frame) { + self.ttyrec.add_frame(frame); + if self.timer.is_none() { + self.set_timer(); + } + } + + fn set_timer(&mut self) { + if let Some(frame) = self.ttyrec.frame(self.idx) { + self.timer = Some(tokio::timer::Delay::new( + self.base_time + + self.played_amount + + frame.adjusted_dur( + self.playback_ratio, + self.max_frame_length, + ), + )); + } else { + self.timer = None; + } + } + + fn poll(&mut self) -> futures::Poll>, Error> { + let frame = if let Some(frame) = self.ttyrec.frame(self.idx) { + frame + } else { + return Ok(futures::Async::Ready(None)); + }; + let timer = if let Some(timer) = &mut self.timer { + timer + } else { + return Ok(futures::Async::Ready(None)); + }; + + futures::try_ready!(timer.poll().context(crate::error::Sleep)); + let ret = frame.data.clone(); + + self.idx += 1; + self.played_amount += + frame.adjusted_dur(self.playback_ratio, self.max_frame_length); + self.set_timer(); + + Ok(futures::Async::Ready(Some(ret))) + } +} + #[allow(clippy::large_enum_variant)] enum FileState { Closed { @@ -68,16 +180,10 @@ enum FileState { struct PlaySession { file: FileState, - playback_ratio: f32, - max_frame_length: Option, - + player: Player, raw_screen: Option, key_reader: crate::key_reader::KeyReader, - to_write: DumbDelayQueue>, - // to_write: tokio::timer::delay_queue::DelayQueue>, - base_time: std::time::Instant, last_frame_time: std::time::Duration, - total_time_clamped: std::time::Duration, paused: Option, } @@ -91,16 +197,10 @@ impl PlaySession { file: FileState::Closed { filename: filename.to_string(), }, - playback_ratio, - max_frame_length, - + player: Player::new(playback_ratio, max_frame_length), raw_screen: None, key_reader: crate::key_reader::KeyReader::new(), - to_write: DumbDelayQueue::new(), - // to_write: tokio::timer::delay_queue::DelayQueue::new(), - base_time: std::time::Instant::now(), last_frame_time: std::time::Duration::default(), - total_time_clamped: std::time::Duration::default(), paused: None, } } @@ -114,9 +214,8 @@ impl PlaySession { ' ', )) => { if let Some(time) = self.paused.take() { - let diff = std::time::Instant::now() - time; - self.to_write.time_incr(diff); - self.base_time += diff; + self.player + .base_time_incr(std::time::Instant::now() - time); } else { self.paused = Some(std::time::Instant::now()); } @@ -174,20 +273,12 @@ impl PlaySession { .context(crate::error::ReadTtyrec)) { let frame_time = frame.time - reader.offset().unwrap(); - let frame_dur = (frame_time - self.last_frame_time) - .div_f32(self.playback_ratio); - self.total_time_clamped += self - .max_frame_length - .map_or(frame_dur, |max_frame_length| { - frame_dur.min(max_frame_length) - }); - - self.to_write.insert_at( - frame.data, - self.base_time + self.total_time_clamped, - ); - + let frame_dur = frame_time - self.last_frame_time; self.last_frame_time = frame_time; + self.player.add_frame(Frame { + dur: frame_dur, + data: frame.data, + }); } else { self.file = FileState::Eof; } @@ -228,11 +319,7 @@ impl PlaySession { return Ok(component_future::Async::NothingToDo); } - if let Some(data) = component_future::try_ready!(self - .to_write - .poll() - .context(crate::error::Sleep)) - { + if let Some(data) = component_future::try_ready!(self.player.poll()) { // TODO async let stdout = std::io::stdout(); let mut stdout = stdout.lock(); @@ -256,68 +343,3 @@ impl futures::future::Future for PlaySession { component_future::poll_future(self, Self::POLL_FNS) } } - -// XXX tokio's delay_queue implementation seems to have a bug when -// interleaving inserts and polls - if you insert some entries, then poll -// successfully, then insert some more entries, the task won't get notified to -// wake up until the first entry after the successful poll is ready, instead -// of when the next entry of the original set is ready -// NOTE: this implementation is, as its name indicates, pretty dumb - it -// requires the entries to be inserted in order or else it won't work. this is -// fine for reading ttyrecs, but is probably not great for a general purpose -// thing. -struct DumbDelayQueueEntry { - timer: tokio::timer::Delay, - data: T, -} - -struct DumbDelayQueue { - queue: std::collections::VecDeque>, -} - -impl DumbDelayQueue { - fn new() -> Self { - Self { - queue: std::collections::VecDeque::new(), - } - } - - fn insert_at(&mut self, data: T, time: std::time::Instant) { - self.queue.push_back(DumbDelayQueueEntry { - data, - timer: tokio::timer::Delay::new(time), - }) - } - - fn time_incr(&mut self, dur: std::time::Duration) { - for entry in &mut self.queue { - entry.timer.reset(entry.timer.deadline() + dur); - } - } -} - -#[must_use = "streams do nothing unless polled"] -impl futures::stream::Stream for DumbDelayQueue { - type Item = T; - type Error = tokio::timer::Error; - - fn poll(&mut self) -> futures::Poll, Self::Error> { - if let Some(mut entry) = self.queue.pop_front() { - match entry.timer.poll() { - Ok(futures::Async::Ready(_)) => { - Ok(futures::Async::Ready(Some(entry.data))) - } - Ok(futures::Async::NotReady) => { - self.queue.push_front(entry); - Ok(futures::Async::NotReady) - } - Err(e) => { - self.queue.push_front(entry); - Err(e) - } - } - } else { - Ok(futures::Async::Ready(None)) - } - } -} -- cgit v1.2.3-54-g00ecf