aboutsummaryrefslogtreecommitdiffstats
path: root/src/process.rs
blob: 70e117838fd516cdb0bc77f7fab4c517281a68e2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use futures::future::Future;
use tokio::io::AsyncRead;
use tokio_pty_process::CommandExt;

#[derive(Debug)]
pub enum Error {
    IOError(std::io::Error),
}

pub fn spawn(line: &str) -> Result<RunningProcess, Error> {
    RunningProcess::new(line)
}

pub enum ProcessEvent {
    Output(Vec<u8>),
    Exit(std::process::ExitStatus),
}

pub struct RunningProcess {
    pty: tokio_pty_process::AsyncPtyMaster,
    process: tokio_pty_process::Child,
    buf: Vec<u8>,
    output_done: bool,
    exit_done: bool,
}

impl RunningProcess {
    fn new(line: &str) -> Result<Self, Error> {
        let pty = tokio_pty_process::AsyncPtyMaster::open()
            .map_err(|e| Error::IOError(e))?;

        let mut argv: Vec<_> = line.split(' ').collect();
        let cmd = argv.remove(0);
        let process = std::process::Command::new(cmd)
            .args(&argv)
            .spawn_pty_async(&pty)
            .map_err(|e| Error::IOError(e))?;

        Ok(RunningProcess {
            pty,
            process,
            buf: Vec::with_capacity(4096),
            output_done: false,
            exit_done: false,
        })
    }
}

#[must_use = "streams do nothing unless polled"]
impl futures::stream::Stream for RunningProcess {
    type Item = ProcessEvent;
    type Error = Error;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        if !self.output_done {
            self.buf.clear();
            let output_poll = self
                .pty
                .read_buf(&mut self.buf)
                .map_err(|e| Error::IOError(e));
            match output_poll {
                Ok(futures::Async::Ready(n)) => {
                    let bytes = self.buf[..n].to_vec();
                    return Ok(futures::Async::Ready(Some(
                        ProcessEvent::Output(bytes),
                    )));
                }
                Ok(futures::Async::NotReady) => {
                    return Ok(futures::Async::NotReady);
                }
                Err(_) => {
                    self.output_done = true;
                }
            }
        }

        if !self.exit_done {
            let exit_poll =
                self.process.poll().map_err(|e| Error::IOError(e));
            match exit_poll {
                Ok(futures::Async::Ready(status)) => {
                    self.exit_done = true;
                    return Ok(futures::Async::Ready(Some(
                        ProcessEvent::Exit(status),
                    )));
                }
                Ok(futures::Async::NotReady) => {
                    return Ok(futures::Async::NotReady);
                }
                Err(e) => {
                    return Err(e);
                }
            }
        }

        Ok(futures::Async::Ready(None))
    }
}