From 8f13228756dea88f1b8bdeaba1b46db400bab68c Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 25 Oct 2019 05:59:21 -0400 Subject: simplify ttyrec implementation --- src/cmd/play.rs | 10 +- src/cmd/record.rs | 36 +++---- src/ttyrec.rs | 290 ++++++++++++++++++++++++++++-------------------------- 3 files changed, 175 insertions(+), 161 deletions(-) diff --git a/src/cmd/play.rs b/src/cmd/play.rs index 368e368..d73dcb4 100644 --- a/src/cmd/play.rs +++ b/src/cmd/play.rs @@ -50,7 +50,7 @@ enum FileState { fut: tokio::fs::file::OpenFuture, }, Open { - file: crate::ttyrec::File, + reader: crate::ttyrec::Reader, }, Eof, } @@ -106,8 +106,8 @@ impl PlaySession { filename: filename.to_string(), } })); - let file = crate::ttyrec::File::new(file); - self.file = FileState::Open { file }; + let reader = crate::ttyrec::Reader::new(file); + self.file = FileState::Open { reader }; Ok(component_future::Async::DidWork) } _ => Ok(component_future::Async::NothingToDo), @@ -115,9 +115,9 @@ impl PlaySession { } fn poll_read_file(&mut self) -> component_future::Poll<(), Error> { - if let FileState::Open { file } = &mut self.file { + if let FileState::Open { reader } = &mut self.file { if let Some(frame) = - component_future::try_ready!(file.poll_read()) + component_future::try_ready!(reader.poll_read()) { self.to_write .insert_at(frame.data, self.base_time + frame.time); diff --git a/src/cmd/record.rs b/src/cmd/record.rs index 6fda88f..46e76e1 100644 --- a/src/cmd/record.rs +++ b/src/cmd/record.rs @@ -62,7 +62,7 @@ enum FileState { fut: tokio::fs::file::CreateFuture, }, Open { - file: crate::ttyrec::File, + writer: crate::ttyrec::Writer, }, } @@ -141,9 +141,11 @@ impl RecordSession { filename: filename.clone(), } })); - let mut file = crate::ttyrec::File::new(file); - file.write_frame(self.buffer.contents())?; - self.file = FileState::Open { file }; + let mut writer = crate::ttyrec::Writer::new(file); + if !self.buffer.contents().is_empty() { + writer.frame(self.buffer.contents())?; + } + self.file = FileState::Open { writer }; Ok(component_future::Async::DidWork) } FileState::Open { .. } => { @@ -173,8 +175,8 @@ impl RecordSession { } tokio_pty_process_stream::Event::Output { data } => { self.record_bytes(&data); - if let FileState::Open { file } = &mut self.file { - file.write_frame(&data)?; + if let FileState::Open { writer } = &mut self.file { + writer.frame(&data)?; } } } @@ -222,23 +224,23 @@ impl RecordSession { } fn poll_write_file(&mut self) -> component_future::Poll<(), Error> { - let file = match &mut self.file { - FileState::Open { file } => file, + let writer = match &mut self.file { + FileState::Open { writer } => writer, _ => { return Ok(component_future::Async::NothingToDo); } }; - match file.poll_write()? { - futures::Async::Ready(()) => Ok(component_future::Async::DidWork), - futures::Async::NotReady => { - // ship all data to the server before actually ending - if self.done && file.is_empty() { - Ok(component_future::Async::Ready(())) - } else { - Ok(component_future::Async::NotReady) - } + if writer.is_empty() { + // finish writing to the file before actually ending + if self.done { + Ok(component_future::Async::Ready(())) + } else { + Ok(component_future::Async::NothingToDo) } + } else { + component_future::try_ready!(writer.poll_write()); + Ok(component_future::Async::DidWork) } } } diff --git a/src/ttyrec.rs b/src/ttyrec.rs index 2312181..2ccefdc 100644 --- a/src/ttyrec.rs +++ b/src/ttyrec.rs @@ -1,6 +1,5 @@ use crate::prelude::*; use std::convert::TryFrom as _; -use tokio::io::{AsyncRead as _, AsyncWrite as _}; pub struct Frame { pub time: std::time::Duration, @@ -31,173 +30,186 @@ impl std::convert::TryFrom for Vec { } } -pub struct File { - file: tokio::fs::File, - base_time: Option, - waiting: usize, - wframe: futures::sink::Wait>, - rframe: tokio::sync::mpsc::UnboundedReceiver, - writing: std::collections::VecDeque, - read_buf: [u8; 4096], +#[repr(packed)] +struct Header { + secs: u32, + micros: u32, + len: u32, +} + +impl Header { + fn time(&self) -> std::time::Duration { + std::time::Duration::from_micros(u64::from( + self.secs * 1_000_000 + self.micros, + )) + } + + fn len(&self) -> usize { + self.len as usize + } +} + +pub struct Parser { reading: std::collections::VecDeque, - read_state: Option<(u32, u32, u32)>, - read: std::collections::VecDeque, + read_state: Option
, } -impl File { - pub fn new(file: tokio::fs::File) -> Self { - let (wframe, rframe) = tokio::sync::mpsc::unbounded_channel(); +impl Parser { + pub fn new() -> Self { Self { - file, - base_time: None, - waiting: 0, - wframe: wframe.wait(), - rframe, - writing: std::collections::VecDeque::new(), - read_buf: [0; 4096], reading: std::collections::VecDeque::new(), read_state: None, - read: std::collections::VecDeque::new(), } } - pub fn write_frame(&mut self, data: &[u8]) -> Result<()> { - let now = std::time::Instant::now(); - let base_time = if let Some(base_time) = &self.base_time { - *base_time + pub fn add_bytes(&mut self, bytes: &[u8]) { + self.reading.extend(bytes.iter()); + } + + pub fn next_frame(&mut self) -> Option { + let header = if let Some(header) = &self.read_state { + header } else { - self.base_time = Some(now); - now + if self.reading.len() < std::mem::size_of::
() { + return None; + } + + let secs1 = self.reading.pop_front().unwrap(); + let secs2 = self.reading.pop_front().unwrap(); + let secs3 = self.reading.pop_front().unwrap(); + let secs4 = self.reading.pop_front().unwrap(); + let secs = u32::from_le_bytes([secs1, secs2, secs3, secs4]); + + let usecs1 = self.reading.pop_front().unwrap(); + let usecs2 = self.reading.pop_front().unwrap(); + let usecs3 = self.reading.pop_front().unwrap(); + let usecs4 = self.reading.pop_front().unwrap(); + let micros = u32::from_le_bytes([usecs1, usecs2, usecs3, usecs4]); + + let len1 = self.reading.pop_front().unwrap(); + let len2 = self.reading.pop_front().unwrap(); + let len3 = self.reading.pop_front().unwrap(); + let len4 = self.reading.pop_front().unwrap(); + let len = u32::from_le_bytes([len1, len2, len3, len4]); + + let header = Header { secs, micros, len }; + self.read_state = Some(header); + self.read_state.as_ref().unwrap() }; - self.waiting += 1; - self.wframe - .send(Frame { - time: now - base_time, - data: data.to_vec(), - }) - .context(crate::error::WriteChannel) + if self.reading.len() < header.len() { + return None; + } + + let mut data = vec![]; + for _ in 0..header.len() { + data.push(self.reading.pop_front().unwrap()); + } + + let time = header.time(); + + self.read_state = None; + Some(Frame { time, data }) } +} - pub fn poll_write( - &mut self, - ) -> std::result::Result, Error> { - loop { - if self.writing.is_empty() { - match self.rframe.poll().context(crate::error::ReadChannel)? { - futures::Async::Ready(Some(frame)) => { - self.writing - .extend(Vec::::try_from(frame)?.iter()); - self.waiting -= 1; - } - futures::Async::Ready(None) => unreachable!(), - futures::Async::NotReady => { - return Ok(futures::Async::NotReady); - } - } - } +pub struct Creator { + base_time: Option, +} - let (a, b) = self.writing.as_slices(); - let buf = if a.is_empty() { b } else { a }; - match self - .file - .poll_write(buf) - .context(crate::error::WriteFile)? - { - futures::Async::Ready(n) => { - for _ in 0..n { - self.writing.pop_front(); - } - } - futures::Async::NotReady => { - return Ok(futures::Async::NotReady); - } - } +impl Creator { + pub fn new() -> Self { + Self { base_time: None } + } + + pub fn frame(&mut self, data: &[u8]) -> Result> { + let cur_time = std::time::Instant::now(); + let base_time = if let Some(base_time) = &self.base_time { + base_time + } else { + self.base_time = Some(cur_time); + self.base_time.as_ref().unwrap() + }; + Vec::::try_from(Frame { + time: cur_time - *base_time, + data: data.to_vec(), + }) + } +} + +pub struct Reader { + reader: R, + parser: Parser, + read_buf: [u8; 4096], + done_reading: bool, +} + +impl Reader { + pub fn new(reader: R) -> Self { + Self { + reader, + parser: Parser::new(), + read_buf: [0; 4096], + done_reading: false, } } - pub fn poll_read( - &mut self, - ) -> std::result::Result>, Error> { + pub fn poll_read(&mut self) -> futures::Poll, Error> { loop { - if let Some(frame) = self.read.pop_front() { + if let Some(frame) = self.parser.next_frame() { return Ok(futures::Async::Ready(Some(frame))); + } else if self.done_reading { + return Ok(futures::Async::Ready(None)); } - match self - .file + let n = futures::try_ready!(self + .reader .poll_read(&mut self.read_buf) - .context(crate::error::ReadFile)? - { - futures::Async::Ready(n) => { - if n > 0 { - self.reading.extend(self.read_buf[..n].iter()); - self.parse_frames(); - } else { - return Ok(futures::Async::Ready(None)); - } - } - futures::Async::NotReady => { - return Ok(futures::Async::NotReady); - } + .context(crate::error::ReadFile)); + if n > 0 { + self.parser.add_bytes(&self.read_buf[..n]); + } else { + self.done_reading = true; } } } +} - pub fn is_empty(&self) -> bool { - self.writing.is_empty() && self.waiting == 0 +pub struct Writer { + writer: W, + creator: Creator, + to_write: std::collections::VecDeque, +} + +impl Writer { + pub fn new(writer: W) -> Self { + Self { + writer, + creator: Creator::new(), + to_write: std::collections::VecDeque::new(), + } } - fn parse_frames(&mut self) { - loop { - match self.read_state { - Some((secs, usecs, len)) => { - if self.reading.len() < len as usize { - break; - } - - let mut data = vec![]; - for _ in 0..len { - data.push(self.reading.pop_front().unwrap()); - } - - if self.base_time.is_none() { - self.base_time = Some(std::time::Instant::now()); - } - let time = std::time::Duration::from_micros(u64::from( - secs * 1_000_000 + usecs, - )); - - self.read.push_back(Frame { time, data }); - - self.read_state = None; - } - None => { - if self.reading.len() < 12 { - break; - } - - let secs1 = self.reading.pop_front().unwrap(); - let secs2 = self.reading.pop_front().unwrap(); - let secs3 = self.reading.pop_front().unwrap(); - let secs4 = self.reading.pop_front().unwrap(); - let secs = - u32::from_le_bytes([secs1, secs2, secs3, secs4]); - let usecs1 = self.reading.pop_front().unwrap(); - let usecs2 = self.reading.pop_front().unwrap(); - let usecs3 = self.reading.pop_front().unwrap(); - let usecs4 = self.reading.pop_front().unwrap(); - let usecs = - u32::from_le_bytes([usecs1, usecs2, usecs3, usecs4]); - let len1 = self.reading.pop_front().unwrap(); - let len2 = self.reading.pop_front().unwrap(); - let len3 = self.reading.pop_front().unwrap(); - let len4 = self.reading.pop_front().unwrap(); - let len = u32::from_le_bytes([len1, len2, len3, len4]); - - self.read_state = Some((secs, usecs, len)); - } - } + pub fn frame(&mut self, data: &[u8]) -> Result<()> { + let bytes = self.creator.frame(data)?; + self.to_write.extend(bytes.iter()); + Ok(()) + } + + pub fn poll_write(&mut self) -> futures::Poll<(), Error> { + let (a, b) = self.to_write.as_slices(); + let buf = if a.is_empty() { b } else { a }; + let n = futures::try_ready!(self + .writer + .poll_write(buf) + .context(crate::error::WriteFile)); + for _ in 0..n { + self.to_write.pop_front(); } + Ok(futures::Async::Ready(())) + } + + pub fn is_empty(&self) -> bool { + self.to_write.is_empty() } } -- cgit v1.2.3-54-g00ecf