aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-11-04 14:06:39 -0500
committerJesse Luehrs <doy@tozt.net>2019-11-04 14:06:39 -0500
commit5033f122cf47437767d9e9f4e8b2f7c0c37aa316 (patch)
tree7b7bcff00d20fed6376ba83ca6c49cd577ca8a6e
parenta8622661af3716ef8a4718b6cc5df569e2f7e74e (diff)
downloadteleterm-5033f122cf47437767d9e9f4e8b2f7c0c37aa316.tar.gz
teleterm-5033f122cf47437767d9e9f4e8b2f7c0c37aa316.zip
refactor to read the entire ttyrec into memory
-rw-r--r--src/cmd/play.rs222
1 files changed, 122 insertions, 100 deletions
diff --git a/src/cmd/play.rs b/src/cmd/play.rs
index 427f611..e4cf9c1 100644
--- a/src/cmd/play.rs
+++ b/src/cmd/play.rs
@@ -51,6 +51,118 @@ pub fn config(
Ok(Box::new(config))
}
+struct Frame {
+ dur: std::time::Duration,
+ data: Vec<u8>,
+}
+
+impl Frame {
+ fn adjusted_dur(
+ &self,
+ scale: f32,
+ clamp: Option<std::time::Duration>,
+ ) -> std::time::Duration {
+ let scaled = self.dur.div_f32(scale);
+ clamp.map_or(scaled, |clamp| scaled.min(clamp))
+ }
+}
+
+#[derive(Default)]
+struct Ttyrec {
+ frames: Vec<Frame>,
+}
+
+impl Ttyrec {
+ fn new() -> Self {
+ Self::default()
+ }
+
+ fn add_frame(&mut self, frame: Frame) {
+ self.frames.push(frame);
+ }
+
+ fn frame(&self, idx: usize) -> Option<&Frame> {
+ self.frames.get(idx)
+ }
+}
+
+struct Player {
+ playback_ratio: f32,
+ max_frame_length: Option<std::time::Duration>,
+ ttyrec: Ttyrec,
+ idx: usize,
+ timer: Option<tokio::timer::Delay>,
+ base_time: std::time::Instant,
+ played_amount: std::time::Duration,
+}
+
+impl Player {
+ fn new(
+ playback_ratio: f32,
+ max_frame_length: Option<std::time::Duration>,
+ ) -> Self {
+ Self {
+ playback_ratio,
+ max_frame_length,
+ ttyrec: Ttyrec::new(),
+ idx: 0,
+ timer: None,
+ base_time: std::time::Instant::now(),
+ played_amount: std::time::Duration::default(),
+ }
+ }
+
+ fn base_time_incr(&mut self, incr: std::time::Duration) {
+ self.base_time += incr;
+ self.set_timer();
+ }
+
+ fn add_frame(&mut self, frame: Frame) {
+ self.ttyrec.add_frame(frame);
+ if self.timer.is_none() {
+ self.set_timer();
+ }
+ }
+
+ fn set_timer(&mut self) {
+ if let Some(frame) = self.ttyrec.frame(self.idx) {
+ self.timer = Some(tokio::timer::Delay::new(
+ self.base_time
+ + self.played_amount
+ + frame.adjusted_dur(
+ self.playback_ratio,
+ self.max_frame_length,
+ ),
+ ));
+ } else {
+ self.timer = None;
+ }
+ }
+
+ fn poll(&mut self) -> futures::Poll<Option<Vec<u8>>, Error> {
+ let frame = if let Some(frame) = self.ttyrec.frame(self.idx) {
+ frame
+ } else {
+ return Ok(futures::Async::Ready(None));
+ };
+ let timer = if let Some(timer) = &mut self.timer {
+ timer
+ } else {
+ return Ok(futures::Async::Ready(None));
+ };
+
+ futures::try_ready!(timer.poll().context(crate::error::Sleep));
+ let ret = frame.data.clone();
+
+ self.idx += 1;
+ self.played_amount +=
+ frame.adjusted_dur(self.playback_ratio, self.max_frame_length);
+ self.set_timer();
+
+ Ok(futures::Async::Ready(Some(ret)))
+ }
+}
+
#[allow(clippy::large_enum_variant)]
enum FileState {
Closed {
@@ -68,16 +180,10 @@ enum FileState {
struct PlaySession {
file: FileState,
- playback_ratio: f32,
- max_frame_length: Option<std::time::Duration>,
-
+ player: Player,
raw_screen: Option<crossterm::RawScreen>,
key_reader: crate::key_reader::KeyReader,
- to_write: DumbDelayQueue<Vec<u8>>,
- // to_write: tokio::timer::delay_queue::DelayQueue<Vec<u8>>,
- base_time: std::time::Instant,
last_frame_time: std::time::Duration,
- total_time_clamped: std::time::Duration,
paused: Option<std::time::Instant>,
}
@@ -91,16 +197,10 @@ impl PlaySession {
file: FileState::Closed {
filename: filename.to_string(),
},
- playback_ratio,
- max_frame_length,
-
+ player: Player::new(playback_ratio, max_frame_length),
raw_screen: None,
key_reader: crate::key_reader::KeyReader::new(),
- to_write: DumbDelayQueue::new(),
- // to_write: tokio::timer::delay_queue::DelayQueue::new(),
- base_time: std::time::Instant::now(),
last_frame_time: std::time::Duration::default(),
- total_time_clamped: std::time::Duration::default(),
paused: None,
}
}
@@ -114,9 +214,8 @@ impl PlaySession {
' ',
)) => {
if let Some(time) = self.paused.take() {
- let diff = std::time::Instant::now() - time;
- self.to_write.time_incr(diff);
- self.base_time += diff;
+ self.player
+ .base_time_incr(std::time::Instant::now() - time);
} else {
self.paused = Some(std::time::Instant::now());
}
@@ -174,20 +273,12 @@ impl PlaySession {
.context(crate::error::ReadTtyrec))
{
let frame_time = frame.time - reader.offset().unwrap();
- let frame_dur = (frame_time - self.last_frame_time)
- .div_f32(self.playback_ratio);
- self.total_time_clamped += self
- .max_frame_length
- .map_or(frame_dur, |max_frame_length| {
- frame_dur.min(max_frame_length)
- });
-
- self.to_write.insert_at(
- frame.data,
- self.base_time + self.total_time_clamped,
- );
-
+ let frame_dur = frame_time - self.last_frame_time;
self.last_frame_time = frame_time;
+ self.player.add_frame(Frame {
+ dur: frame_dur,
+ data: frame.data,
+ });
} else {
self.file = FileState::Eof;
}
@@ -228,11 +319,7 @@ impl PlaySession {
return Ok(component_future::Async::NothingToDo);
}
- if let Some(data) = component_future::try_ready!(self
- .to_write
- .poll()
- .context(crate::error::Sleep))
- {
+ if let Some(data) = component_future::try_ready!(self.player.poll()) {
// TODO async
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
@@ -256,68 +343,3 @@ impl futures::future::Future for PlaySession {
component_future::poll_future(self, Self::POLL_FNS)
}
}
-
-// XXX tokio's delay_queue implementation seems to have a bug when
-// interleaving inserts and polls - if you insert some entries, then poll
-// successfully, then insert some more entries, the task won't get notified to
-// wake up until the first entry after the successful poll is ready, instead
-// of when the next entry of the original set is ready
-// NOTE: this implementation is, as its name indicates, pretty dumb - it
-// requires the entries to be inserted in order or else it won't work. this is
-// fine for reading ttyrecs, but is probably not great for a general purpose
-// thing.
-struct DumbDelayQueueEntry<T> {
- timer: tokio::timer::Delay,
- data: T,
-}
-
-struct DumbDelayQueue<T> {
- queue: std::collections::VecDeque<DumbDelayQueueEntry<T>>,
-}
-
-impl<T> DumbDelayQueue<T> {
- fn new() -> Self {
- Self {
- queue: std::collections::VecDeque::new(),
- }
- }
-
- fn insert_at(&mut self, data: T, time: std::time::Instant) {
- self.queue.push_back(DumbDelayQueueEntry {
- data,
- timer: tokio::timer::Delay::new(time),
- })
- }
-
- fn time_incr(&mut self, dur: std::time::Duration) {
- for entry in &mut self.queue {
- entry.timer.reset(entry.timer.deadline() + dur);
- }
- }
-}
-
-#[must_use = "streams do nothing unless polled"]
-impl<T> futures::stream::Stream for DumbDelayQueue<T> {
- type Item = T;
- type Error = tokio::timer::Error;
-
- fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
- if let Some(mut entry) = self.queue.pop_front() {
- match entry.timer.poll() {
- Ok(futures::Async::Ready(_)) => {
- Ok(futures::Async::Ready(Some(entry.data)))
- }
- Ok(futures::Async::NotReady) => {
- self.queue.push_front(entry);
- Ok(futures::Async::NotReady)
- }
- Err(e) => {
- self.queue.push_front(entry);
- Err(e)
- }
- }
- } else {
- Ok(futures::Async::Ready(None))
- }
- }
-}