1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
use futures::future::Future;
use tokio::io::AsyncRead;
use tokio_pty_process::CommandExt;
#[derive(Debug)]
pub enum Error {
IOError(std::io::Error),
}
pub fn spawn(line: &str) -> Result<RunningProcess, Error> {
RunningProcess::new(line)
}
pub enum ProcessEvent {
Output(Vec<u8>),
Exit(std::process::ExitStatus),
}
pub struct RunningProcess {
pty: tokio_pty_process::AsyncPtyMaster,
process: tokio_pty_process::Child,
buf: Vec<u8>,
output_done: bool,
exit_done: bool,
}
impl RunningProcess {
fn new(line: &str) -> Result<Self, Error> {
let pty = tokio_pty_process::AsyncPtyMaster::open()
.map_err(|e| Error::IOError(e))?;
let mut argv: Vec<_> = line.split(' ').collect();
let cmd = argv.remove(0);
let process = std::process::Command::new(cmd)
.args(&argv)
.spawn_pty_async(&pty)
.map_err(|e| Error::IOError(e))?;
Ok(RunningProcess {
pty,
process,
buf: Vec::with_capacity(4096),
output_done: false,
exit_done: false,
})
}
}
#[must_use = "streams do nothing unless polled"]
impl futures::stream::Stream for RunningProcess {
type Item = ProcessEvent;
type Error = Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
if !self.output_done {
self.buf.clear();
let output_poll = self
.pty
.read_buf(&mut self.buf)
.map_err(|e| Error::IOError(e));
match output_poll {
Ok(futures::Async::Ready(n)) => {
let bytes = self.buf[..n].to_vec();
return Ok(futures::Async::Ready(Some(
ProcessEvent::Output(bytes),
)));
}
Ok(futures::Async::NotReady) => {
return Ok(futures::Async::NotReady);
}
Err(_) => {
self.output_done = true;
}
}
}
if !self.exit_done {
let exit_poll =
self.process.poll().map_err(|e| Error::IOError(e));
match exit_poll {
Ok(futures::Async::Ready(status)) => {
self.exit_done = true;
return Ok(futures::Async::Ready(Some(
ProcessEvent::Exit(status),
)));
}
Ok(futures::Async::NotReady) => {
return Ok(futures::Async::NotReady);
}
Err(e) => {
return Err(e);
}
}
}
Ok(futures::Async::Ready(None))
}
}
|