aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-25 05:59:21 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-25 05:59:21 -0400
commit8f13228756dea88f1b8bdeaba1b46db400bab68c (patch)
tree14f43fe2e4a4dfceca01298fa200319c8d120a09
parenta48380106d19624641f63b0fca70a41cd35fe811 (diff)
downloadteleterm-8f13228756dea88f1b8bdeaba1b46db400bab68c.tar.gz
teleterm-8f13228756dea88f1b8bdeaba1b46db400bab68c.zip
simplify ttyrec implementation
-rw-r--r--src/cmd/play.rs10
-rw-r--r--src/cmd/record.rs36
-rw-r--r--src/ttyrec.rs290
3 files changed, 175 insertions, 161 deletions
diff --git a/src/cmd/play.rs b/src/cmd/play.rs
index 368e368..d73dcb4 100644
--- a/src/cmd/play.rs
+++ b/src/cmd/play.rs
@@ -50,7 +50,7 @@ enum FileState {
fut: tokio::fs::file::OpenFuture<String>,
},
Open {
- file: crate::ttyrec::File,
+ reader: crate::ttyrec::Reader<tokio::fs::File>,
},
Eof,
}
@@ -106,8 +106,8 @@ impl PlaySession {
filename: filename.to_string(),
}
}));
- let file = crate::ttyrec::File::new(file);
- self.file = FileState::Open { file };
+ let reader = crate::ttyrec::Reader::new(file);
+ self.file = FileState::Open { reader };
Ok(component_future::Async::DidWork)
}
_ => Ok(component_future::Async::NothingToDo),
@@ -115,9 +115,9 @@ impl PlaySession {
}
fn poll_read_file(&mut self) -> component_future::Poll<(), Error> {
- if let FileState::Open { file } = &mut self.file {
+ if let FileState::Open { reader } = &mut self.file {
if let Some(frame) =
- component_future::try_ready!(file.poll_read())
+ component_future::try_ready!(reader.poll_read())
{
self.to_write
.insert_at(frame.data, self.base_time + frame.time);
diff --git a/src/cmd/record.rs b/src/cmd/record.rs
index 6fda88f..46e76e1 100644
--- a/src/cmd/record.rs
+++ b/src/cmd/record.rs
@@ -62,7 +62,7 @@ enum FileState {
fut: tokio::fs::file::CreateFuture<String>,
},
Open {
- file: crate::ttyrec::File,
+ writer: crate::ttyrec::Writer<tokio::fs::File>,
},
}
@@ -141,9 +141,11 @@ impl RecordSession {
filename: filename.clone(),
}
}));
- let mut file = crate::ttyrec::File::new(file);
- file.write_frame(self.buffer.contents())?;
- self.file = FileState::Open { file };
+ let mut writer = crate::ttyrec::Writer::new(file);
+ if !self.buffer.contents().is_empty() {
+ writer.frame(self.buffer.contents())?;
+ }
+ self.file = FileState::Open { writer };
Ok(component_future::Async::DidWork)
}
FileState::Open { .. } => {
@@ -173,8 +175,8 @@ impl RecordSession {
}
tokio_pty_process_stream::Event::Output { data } => {
self.record_bytes(&data);
- if let FileState::Open { file } = &mut self.file {
- file.write_frame(&data)?;
+ if let FileState::Open { writer } = &mut self.file {
+ writer.frame(&data)?;
}
}
}
@@ -222,23 +224,23 @@ impl RecordSession {
}
fn poll_write_file(&mut self) -> component_future::Poll<(), Error> {
- let file = match &mut self.file {
- FileState::Open { file } => file,
+ let writer = match &mut self.file {
+ FileState::Open { writer } => writer,
_ => {
return Ok(component_future::Async::NothingToDo);
}
};
- match file.poll_write()? {
- futures::Async::Ready(()) => Ok(component_future::Async::DidWork),
- futures::Async::NotReady => {
- // ship all data to the server before actually ending
- if self.done && file.is_empty() {
- Ok(component_future::Async::Ready(()))
- } else {
- Ok(component_future::Async::NotReady)
- }
+ if writer.is_empty() {
+ // finish writing to the file before actually ending
+ if self.done {
+ Ok(component_future::Async::Ready(()))
+ } else {
+ Ok(component_future::Async::NothingToDo)
}
+ } else {
+ component_future::try_ready!(writer.poll_write());
+ Ok(component_future::Async::DidWork)
}
}
}
diff --git a/src/ttyrec.rs b/src/ttyrec.rs
index 2312181..2ccefdc 100644
--- a/src/ttyrec.rs
+++ b/src/ttyrec.rs
@@ -1,6 +1,5 @@
use crate::prelude::*;
use std::convert::TryFrom as _;
-use tokio::io::{AsyncRead as _, AsyncWrite as _};
pub struct Frame {
pub time: std::time::Duration,
@@ -31,173 +30,186 @@ impl std::convert::TryFrom<Frame> for Vec<u8> {
}
}
-pub struct File {
- file: tokio::fs::File,
- base_time: Option<std::time::Instant>,
- waiting: usize,
- wframe: futures::sink::Wait<tokio::sync::mpsc::UnboundedSender<Frame>>,
- rframe: tokio::sync::mpsc::UnboundedReceiver<Frame>,
- writing: std::collections::VecDeque<u8>,
- read_buf: [u8; 4096],
+#[repr(packed)]
+struct Header {
+ secs: u32,
+ micros: u32,
+ len: u32,
+}
+
+impl Header {
+ fn time(&self) -> std::time::Duration {
+ std::time::Duration::from_micros(u64::from(
+ self.secs * 1_000_000 + self.micros,
+ ))
+ }
+
+ fn len(&self) -> usize {
+ self.len as usize
+ }
+}
+
+pub struct Parser {
reading: std::collections::VecDeque<u8>,
- read_state: Option<(u32, u32, u32)>,
- read: std::collections::VecDeque<Frame>,
+ read_state: Option<Header>,
}
-impl File {
- pub fn new(file: tokio::fs::File) -> Self {
- let (wframe, rframe) = tokio::sync::mpsc::unbounded_channel();
+impl Parser {
+ pub fn new() -> Self {
Self {
- file,
- base_time: None,
- waiting: 0,
- wframe: wframe.wait(),
- rframe,
- writing: std::collections::VecDeque::new(),
- read_buf: [0; 4096],
reading: std::collections::VecDeque::new(),
read_state: None,
- read: std::collections::VecDeque::new(),
}
}
- pub fn write_frame(&mut self, data: &[u8]) -> Result<()> {
- let now = std::time::Instant::now();
- let base_time = if let Some(base_time) = &self.base_time {
- *base_time
+ pub fn add_bytes(&mut self, bytes: &[u8]) {
+ self.reading.extend(bytes.iter());
+ }
+
+ pub fn next_frame(&mut self) -> Option<Frame> {
+ let header = if let Some(header) = &self.read_state {
+ header
} else {
- self.base_time = Some(now);
- now
+ if self.reading.len() < std::mem::size_of::<Header>() {
+ return None;
+ }
+
+ let secs1 = self.reading.pop_front().unwrap();
+ let secs2 = self.reading.pop_front().unwrap();
+ let secs3 = self.reading.pop_front().unwrap();
+ let secs4 = self.reading.pop_front().unwrap();
+ let secs = u32::from_le_bytes([secs1, secs2, secs3, secs4]);
+
+ let usecs1 = self.reading.pop_front().unwrap();
+ let usecs2 = self.reading.pop_front().unwrap();
+ let usecs3 = self.reading.pop_front().unwrap();
+ let usecs4 = self.reading.pop_front().unwrap();
+ let micros = u32::from_le_bytes([usecs1, usecs2, usecs3, usecs4]);
+
+ let len1 = self.reading.pop_front().unwrap();
+ let len2 = self.reading.pop_front().unwrap();
+ let len3 = self.reading.pop_front().unwrap();
+ let len4 = self.reading.pop_front().unwrap();
+ let len = u32::from_le_bytes([len1, len2, len3, len4]);
+
+ let header = Header { secs, micros, len };
+ self.read_state = Some(header);
+ self.read_state.as_ref().unwrap()
};
- self.waiting += 1;
- self.wframe
- .send(Frame {
- time: now - base_time,
- data: data.to_vec(),
- })
- .context(crate::error::WriteChannel)
+ if self.reading.len() < header.len() {
+ return None;
+ }
+
+ let mut data = vec![];
+ for _ in 0..header.len() {
+ data.push(self.reading.pop_front().unwrap());
+ }
+
+ let time = header.time();
+
+ self.read_state = None;
+ Some(Frame { time, data })
}
+}
- pub fn poll_write(
- &mut self,
- ) -> std::result::Result<futures::Async<()>, Error> {
- loop {
- if self.writing.is_empty() {
- match self.rframe.poll().context(crate::error::ReadChannel)? {
- futures::Async::Ready(Some(frame)) => {
- self.writing
- .extend(Vec::<u8>::try_from(frame)?.iter());
- self.waiting -= 1;
- }
- futures::Async::Ready(None) => unreachable!(),
- futures::Async::NotReady => {
- return Ok(futures::Async::NotReady);
- }
- }
- }
+pub struct Creator {
+ base_time: Option<std::time::Instant>,
+}
- let (a, b) = self.writing.as_slices();
- let buf = if a.is_empty() { b } else { a };
- match self
- .file
- .poll_write(buf)
- .context(crate::error::WriteFile)?
- {
- futures::Async::Ready(n) => {
- for _ in 0..n {
- self.writing.pop_front();
- }
- }
- futures::Async::NotReady => {
- return Ok(futures::Async::NotReady);
- }
- }
+impl Creator {
+ pub fn new() -> Self {
+ Self { base_time: None }
+ }
+
+ pub fn frame(&mut self, data: &[u8]) -> Result<Vec<u8>> {
+ let cur_time = std::time::Instant::now();
+ let base_time = if let Some(base_time) = &self.base_time {
+ base_time
+ } else {
+ self.base_time = Some(cur_time);
+ self.base_time.as_ref().unwrap()
+ };
+ Vec::<u8>::try_from(Frame {
+ time: cur_time - *base_time,
+ data: data.to_vec(),
+ })
+ }
+}
+
+pub struct Reader<R> {
+ reader: R,
+ parser: Parser,
+ read_buf: [u8; 4096],
+ done_reading: bool,
+}
+
+impl<R: tokio::io::AsyncRead> Reader<R> {
+ pub fn new(reader: R) -> Self {
+ Self {
+ reader,
+ parser: Parser::new(),
+ read_buf: [0; 4096],
+ done_reading: false,
}
}
- pub fn poll_read(
- &mut self,
- ) -> std::result::Result<futures::Async<Option<Frame>>, Error> {
+ pub fn poll_read(&mut self) -> futures::Poll<Option<Frame>, Error> {
loop {
- if let Some(frame) = self.read.pop_front() {
+ if let Some(frame) = self.parser.next_frame() {
return Ok(futures::Async::Ready(Some(frame)));
+ } else if self.done_reading {
+ return Ok(futures::Async::Ready(None));
}
- match self
- .file
+ let n = futures::try_ready!(self
+ .reader
.poll_read(&mut self.read_buf)
- .context(crate::error::ReadFile)?
- {
- futures::Async::Ready(n) => {
- if n > 0 {
- self.reading.extend(self.read_buf[..n].iter());
- self.parse_frames();
- } else {
- return Ok(futures::Async::Ready(None));
- }
- }
- futures::Async::NotReady => {
- return Ok(futures::Async::NotReady);
- }
+ .context(crate::error::ReadFile));
+ if n > 0 {
+ self.parser.add_bytes(&self.read_buf[..n]);
+ } else {
+ self.done_reading = true;
}
}
}
+}
- pub fn is_empty(&self) -> bool {
- self.writing.is_empty() && self.waiting == 0
+pub struct Writer<W> {
+ writer: W,
+ creator: Creator,
+ to_write: std::collections::VecDeque<u8>,
+}
+
+impl<W: tokio::io::AsyncWrite> Writer<W> {
+ pub fn new(writer: W) -> Self {
+ Self {
+ writer,
+ creator: Creator::new(),
+ to_write: std::collections::VecDeque::new(),
+ }
}
- fn parse_frames(&mut self) {
- loop {
- match self.read_state {
- Some((secs, usecs, len)) => {
- if self.reading.len() < len as usize {
- break;
- }
-
- let mut data = vec![];
- for _ in 0..len {
- data.push(self.reading.pop_front().unwrap());
- }
-
- if self.base_time.is_none() {
- self.base_time = Some(std::time::Instant::now());
- }
- let time = std::time::Duration::from_micros(u64::from(
- secs * 1_000_000 + usecs,
- ));
-
- self.read.push_back(Frame { time, data });
-
- self.read_state = None;
- }
- None => {
- if self.reading.len() < 12 {
- break;
- }
-
- let secs1 = self.reading.pop_front().unwrap();
- let secs2 = self.reading.pop_front().unwrap();
- let secs3 = self.reading.pop_front().unwrap();
- let secs4 = self.reading.pop_front().unwrap();
- let secs =
- u32::from_le_bytes([secs1, secs2, secs3, secs4]);
- let usecs1 = self.reading.pop_front().unwrap();
- let usecs2 = self.reading.pop_front().unwrap();
- let usecs3 = self.reading.pop_front().unwrap();
- let usecs4 = self.reading.pop_front().unwrap();
- let usecs =
- u32::from_le_bytes([usecs1, usecs2, usecs3, usecs4]);
- let len1 = self.reading.pop_front().unwrap();
- let len2 = self.reading.pop_front().unwrap();
- let len3 = self.reading.pop_front().unwrap();
- let len4 = self.reading.pop_front().unwrap();
- let len = u32::from_le_bytes([len1, len2, len3, len4]);
-
- self.read_state = Some((secs, usecs, len));
- }
- }
+ pub fn frame(&mut self, data: &[u8]) -> Result<()> {
+ let bytes = self.creator.frame(data)?;
+ self.to_write.extend(bytes.iter());
+ Ok(())
+ }
+
+ pub fn poll_write(&mut self) -> futures::Poll<(), Error> {
+ let (a, b) = self.to_write.as_slices();
+ let buf = if a.is_empty() { b } else { a };
+ let n = futures::try_ready!(self
+ .writer
+ .poll_write(buf)
+ .context(crate::error::WriteFile));
+ for _ in 0..n {
+ self.to_write.pop_front();
}
+ Ok(futures::Async::Ready(()))
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.to_write.is_empty()
}
}