use crate::prelude::*; use std::convert::TryFrom as _; use tokio::io::{AsyncRead as _, AsyncWrite as _}; pub struct Frame { pub time: std::time::Duration, pub data: Vec, } impl Frame { fn as_bytes(&self) -> Vec { let secs = u32::try_from(self.time.as_secs()).unwrap(); let micros = self.time.subsec_micros(); let len = u32::try_from(self.data.len()).unwrap(); let mut bytes = vec![]; bytes.extend(secs.to_le_bytes().iter()); bytes.extend(micros.to_le_bytes().iter()); bytes.extend(len.to_le_bytes().iter()); bytes.extend(self.data.iter()); bytes } } pub struct File { file: tokio::fs::File, base_time: Option, waiting: usize, wframe: futures::sink::Wait>, rframe: tokio::sync::mpsc::UnboundedReceiver, writing: std::collections::VecDeque, read_buf: [u8; 4096], reading: std::collections::VecDeque, read_state: Option<(u32, u32, u32)>, read: std::collections::VecDeque, } impl File { pub fn new(file: tokio::fs::File) -> Self { let (wframe, rframe) = tokio::sync::mpsc::unbounded_channel(); Self { file, base_time: None, waiting: 0, wframe: wframe.wait(), rframe, writing: std::collections::VecDeque::new(), read_buf: [0; 4096], reading: std::collections::VecDeque::new(), read_state: None, read: std::collections::VecDeque::new(), } } pub fn write_frame(&mut self, data: &[u8]) -> Result<()> { let now = std::time::Instant::now(); let base_time = if let Some(base_time) = &self.base_time { *base_time } else { self.base_time = Some(now); now }; self.waiting += 1; self.wframe .send(Frame { time: now - base_time, data: data.to_vec(), }) .context(crate::error::WriteChannel) } pub fn poll_write( &mut self, ) -> std::result::Result, Error> { loop { if self.writing.is_empty() { match self.rframe.poll().context(crate::error::ReadChannel)? { futures::Async::Ready(Some(frame)) => { self.writing.extend(frame.as_bytes().iter()); self.waiting -= 1; } futures::Async::Ready(None) => unreachable!(), futures::Async::NotReady => { return Ok(futures::Async::NotReady); } } } let (a, b) = self.writing.as_slices(); let buf = if a.is_empty() { b } else { a }; match self .file .poll_write(buf) .context(crate::error::WriteFile)? { futures::Async::Ready(n) => { for _ in 0..n { self.writing.pop_front(); } } futures::Async::NotReady => { return Ok(futures::Async::NotReady); } } } } pub fn poll_read( &mut self, ) -> std::result::Result>, Error> { loop { if let Some(frame) = self.read.pop_front() { return Ok(futures::Async::Ready(Some(frame))); } match self .file .poll_read(&mut self.read_buf) .context(crate::error::ReadFile)? { futures::Async::Ready(n) => { if n > 0 { self.reading.extend(self.read_buf[..n].iter()); self.parse_frames(); } else { return Ok(futures::Async::Ready(None)); } } futures::Async::NotReady => { return Ok(futures::Async::NotReady); } } } } pub fn is_empty(&self) -> bool { self.writing.is_empty() && self.waiting == 0 } fn parse_frames(&mut self) { loop { match self.read_state { Some((secs, usecs, len)) => { if self.reading.len() < len as usize { break; } let mut data = vec![]; for _ in 0..len { data.push(self.reading.pop_front().unwrap()); } if self.base_time.is_none() { self.base_time = Some(std::time::Instant::now()); } let time = std::time::Duration::from_micros(u64::from( secs * 1_000_000 + usecs, )); self.read.push_back(Frame { time, data }); self.read_state = None; } None => { if self.reading.len() < 12 { break; } let secs1 = self.reading.pop_front().unwrap(); let secs2 = self.reading.pop_front().unwrap(); let secs3 = self.reading.pop_front().unwrap(); let secs4 = self.reading.pop_front().unwrap(); let secs = u32::from_le_bytes([secs1, secs2, secs3, secs4]); let usecs1 = self.reading.pop_front().unwrap(); let usecs2 = self.reading.pop_front().unwrap(); let usecs3 = self.reading.pop_front().unwrap(); let usecs4 = self.reading.pop_front().unwrap(); let usecs = u32::from_le_bytes([usecs1, usecs2, usecs3, usecs4]); let len1 = self.reading.pop_front().unwrap(); let len2 = self.reading.pop_front().unwrap(); let len3 = self.reading.pop_front().unwrap(); let len4 = self.reading.pop_front().unwrap(); let len = u32::from_le_bytes([len1, len2, len3, len4]); self.read_state = Some((secs, usecs, len)); } } } } }