1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
use async_std::io::{ReadExt as _, WriteExt as _};
use futures_lite::future::FutureExt as _;
pub struct Pty {
pty: async_std::sync::Arc<pty_process::Pty>,
close_w: async_std::channel::Sender<()>,
}
impl Pty {
pub fn new(
size: (u16, u16),
entry: &async_std::sync::Arc<async_std::sync::Mutex<super::Entry>>,
input_r: async_std::channel::Receiver<Vec<u8>>,
resize_r: async_std::channel::Receiver<(u16, u16)>,
event_w: async_std::channel::Sender<crate::event::Event>,
) -> anyhow::Result<Self> {
let (close_w, close_r) = async_std::channel::unbounded();
let pty = pty_process::Pty::new()?;
pty.resize(pty_process::Size::new(size.0, size.1))?;
let pty = async_std::sync::Arc::new(pty);
async_std::task::spawn(pty_task(
async_std::sync::Arc::clone(&pty),
async_std::sync::Arc::clone(entry),
input_r,
resize_r,
close_r,
event_w,
));
Ok(Self { pty, close_w })
}
pub fn spawn(
&self,
mut cmd: pty_process::Command,
) -> anyhow::Result<async_std::process::Child> {
Ok(cmd.spawn(&self.pty)?)
}
pub async fn close(&self) {
self.close_w.send(()).await.unwrap();
}
}
async fn pty_task(
pty: async_std::sync::Arc<pty_process::Pty>,
entry: async_std::sync::Arc<async_std::sync::Mutex<super::Entry>>,
input_r: async_std::channel::Receiver<Vec<u8>>,
resize_r: async_std::channel::Receiver<(u16, u16)>,
close_r: async_std::channel::Receiver<()>,
event_w: async_std::channel::Sender<crate::event::Event>,
) {
loop {
enum Res {
Read(Result<usize, std::io::Error>),
Write(Result<Vec<u8>, async_std::channel::RecvError>),
Resize(Result<(u16, u16), async_std::channel::RecvError>),
Close(Result<(), async_std::channel::RecvError>),
}
let mut buf = [0_u8; 4096];
let read = async { Res::Read((&*pty).read(&mut buf).await) };
let write = async { Res::Write(input_r.recv().await) };
let resize = async { Res::Resize(resize_r.recv().await) };
let close = async { Res::Close(close_r.recv().await) };
match read.race(write).race(resize).or(close).await {
Res::Read(res) => match res {
Ok(bytes) => {
entry.lock_arc().await.vt.process(&buf[..bytes]);
event_w
.send(crate::event::Event::PtyOutput)
.await
.unwrap();
}
Err(e) => {
if e.raw_os_error() != Some(libc::EIO) {
panic!("pty read failed: {:?}", e);
}
}
},
Res::Write(res) => match res {
Ok(bytes) => {
(&*pty).write(&bytes).await.unwrap();
}
Err(e) => {
panic!("failed to read from input channel: {}", e);
}
},
Res::Resize(res) => match res {
Ok(size) => {
pty.resize(pty_process::Size::new(size.0, size.1))
.unwrap();
entry.lock_arc().await.vt.set_size(size.0, size.1);
}
Err(e) => {
panic!("failed to read from resize channel: {}", e);
}
},
Res::Close(res) => match res {
Ok(()) => {
event_w
.send(crate::event::Event::PtyClose)
.await
.unwrap();
return;
}
Err(e) => {
panic!("failed to read from close channel: {}", e);
}
},
}
}
}
|