diff options
Diffstat (limited to 'src/process.rs')
-rw-r--r-- | src/process.rs | 116 |
1 files changed, 74 insertions, 42 deletions
diff --git a/src/process.rs b/src/process.rs index 7b8d8e7..70e1178 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,5 +1,4 @@ use futures::future::Future; -use futures::try_ready; use tokio::io::AsyncRead; use tokio_pty_process::CommandExt; @@ -8,59 +7,92 @@ pub enum Error { IOError(std::io::Error), } -pub fn spawn( - line: &str, -) -> Result< - ( - PtyStream, - impl futures::future::Future< - Item = std::process::ExitStatus, - Error = Error, - >, - ), - Error, -> { - let master = tokio_pty_process::AsyncPtyMaster::open() - .map_err(|e| Error::IOError(e))?; - let mut argv: Vec<_> = line.split(' ').collect(); - let cmd = argv.remove(0); - let child = std::process::Command::new(cmd) - .args(&argv) - .spawn_pty_async(&master) - .map_err(|e| Error::IOError(e))? - .map_err(|e| Error::IOError(e)); - let stream = PtyStream::new(master); - Ok((stream, child)) +pub fn spawn(line: &str) -> Result<RunningProcess, Error> { + RunningProcess::new(line) } -pub struct PtyStream { - master: tokio_pty_process::AsyncPtyMaster, +pub enum ProcessEvent { + Output(Vec<u8>), + Exit(std::process::ExitStatus), +} + +pub struct RunningProcess { + pty: tokio_pty_process::AsyncPtyMaster, + process: tokio_pty_process::Child, buf: Vec<u8>, + output_done: bool, + exit_done: bool, } -impl PtyStream { - fn new(master: tokio_pty_process::AsyncPtyMaster) -> Self { - let buf = Vec::with_capacity(4096); - PtyStream { master, buf } +impl RunningProcess { + fn new(line: &str) -> Result<Self, Error> { + let pty = tokio_pty_process::AsyncPtyMaster::open() + .map_err(|e| Error::IOError(e))?; + + let mut argv: Vec<_> = line.split(' ').collect(); + let cmd = argv.remove(0); + let process = std::process::Command::new(cmd) + .args(&argv) + .spawn_pty_async(&pty) + .map_err(|e| Error::IOError(e))?; + + Ok(RunningProcess { + pty, + process, + buf: Vec::with_capacity(4096), + output_done: false, + exit_done: false, + }) } } #[must_use = "streams do nothing unless polled"] -impl futures::stream::Stream for PtyStream { - type Item = Vec<u8>; +impl futures::stream::Stream for RunningProcess { + type Item = ProcessEvent; type Error = Error; fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { - self.buf.clear(); - let n = try_ready!(self - .master - .read_buf(&mut self.buf) - .map_err(|e| { Error::IOError(e) })); - if n > 0 { - let bytes = self.buf[..n].to_vec(); - Ok(futures::Async::Ready(Some(bytes))) - } else { - Ok(futures::Async::NotReady) + if !self.output_done { + self.buf.clear(); + let output_poll = self + .pty + .read_buf(&mut self.buf) + .map_err(|e| Error::IOError(e)); + match output_poll { + Ok(futures::Async::Ready(n)) => { + let bytes = self.buf[..n].to_vec(); + return Ok(futures::Async::Ready(Some( + ProcessEvent::Output(bytes), + ))); + } + Ok(futures::Async::NotReady) => { + return Ok(futures::Async::NotReady); + } + Err(_) => { + self.output_done = true; + } + } } + + if !self.exit_done { + let exit_poll = + self.process.poll().map_err(|e| Error::IOError(e)); + match exit_poll { + Ok(futures::Async::Ready(status)) => { + self.exit_done = true; + return Ok(futures::Async::Ready(Some( + ProcessEvent::Exit(status), + ))); + } + Ok(futures::Async::NotReady) => { + return Ok(futures::Async::NotReady); + } + Err(e) => { + return Err(e); + } + } + } + + Ok(futures::Async::Ready(None)) } } |