1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
use crate::shell::prelude::*;
pub struct Pty {
pts: std::sync::Arc<pty_process::Pts>,
close_w: tokio::sync::mpsc::UnboundedSender<()>,
}
impl Pty {
pub fn new(
size: (u16, u16),
entry: &crate::mutex::Mutex<super::Entry>,
input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
event_w: crate::shell::event::Writer,
) -> Result<Self> {
let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel();
let pty = pty_process::Pty::new()?;
pty.resize(pty_process::Size::new(size.0, size.1))?;
let pts = std::sync::Arc::new(pty.pts()?);
tokio::task::spawn(pty_task(
pty,
std::sync::Arc::clone(&pts),
crate::mutex::clone(entry),
input_r,
resize_r,
close_r,
event_w,
));
Ok(Self { pts, close_w })
}
pub fn spawn(
&self,
mut cmd: pty_process::Command,
) -> Result<tokio::process::Child> {
Ok(cmd.spawn(&*self.pts)?)
}
pub async fn close(&self) {
self.close_w.send(()).unwrap();
}
}
async fn pty_task(
pty: pty_process::Pty,
// take the pts here just to ensure that we don't close it before this
// task finishes, otherwise the read call can return EIO
_pts: std::sync::Arc<pty_process::Pts>,
entry: crate::mutex::Mutex<super::Entry>,
input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
close_r: tokio::sync::mpsc::UnboundedReceiver<()>,
event_w: crate::shell::event::Writer,
) {
enum Res {
Read(Result<bytes::Bytes, std::io::Error>),
Write(Vec<u8>),
Resize((u16, u16)),
Close(()),
}
let (pty_r, mut pty_w) = pty.into_split();
let mut stream: futures_util::stream::SelectAll<_> = [
tokio_util::io::ReaderStream::new(pty_r)
.map(Res::Read)
.boxed(),
tokio_stream::wrappers::UnboundedReceiverStream::new(input_r)
.map(Res::Write)
.boxed(),
tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r)
.map(Res::Resize)
.boxed(),
tokio_stream::wrappers::UnboundedReceiverStream::new(close_r)
.map(Res::Close)
.boxed(),
]
.into_iter()
.collect();
while let Some(res) = stream.next().await {
match res {
Res::Read(res) => match res {
Ok(bytes) => {
entry.clone().lock_owned().await.process(&bytes);
event_w.send(Event::PtyOutput);
}
Err(e) => {
panic!("pty read failed: {:?}", e);
}
},
Res::Write(bytes) => {
pty_w.write(&bytes).await.unwrap();
}
Res::Resize(size) => pty_w
.resize(pty_process::Size::new(size.0, size.1))
.unwrap(),
Res::Close(()) => {
event_w.send(Event::PtyClose);
return;
}
}
}
}
|