summaryrefslogtreecommitdiffstats
path: root/src/shell/history/pty.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/shell/history/pty.rs')
-rw-r--r--src/shell/history/pty.rs118
1 files changed, 56 insertions, 62 deletions
diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs
index 5a51e73..acfe500 100644
--- a/src/shell/history/pty.rs
+++ b/src/shell/history/pty.rs
@@ -1,26 +1,26 @@
use crate::shell::prelude::*;
pub struct Pty {
- pty: async_std::sync::Arc<pty_process::Pty>,
- close_w: async_std::channel::Sender<()>,
+ pts: pty_process::Pts,
+ close_w: tokio::sync::mpsc::UnboundedSender<()>,
}
impl Pty {
pub fn new(
size: (u16, u16),
entry: &crate::mutex::Mutex<super::Entry>,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<Event>,
+ input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
+ resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) -> anyhow::Result<Self> {
- let (close_w, close_r) = async_std::channel::unbounded();
+ let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel();
let pty = pty_process::Pty::new()?;
pty.resize(pty_process::Size::new(size.0, size.1))?;
- let pty = async_std::sync::Arc::new(pty);
+ let pts = pty.pts()?;
- async_std::task::spawn(pty_task(
- async_std::sync::Arc::clone(&pty),
+ tokio::task::spawn(pty_task(
+ pty,
crate::mutex::clone(entry),
input_r,
resize_r,
@@ -28,80 +28,74 @@ impl Pty {
event_w,
));
- Ok(Self { pty, close_w })
+ Ok(Self { pts, close_w })
}
pub fn spawn(
&self,
mut cmd: pty_process::Command,
- ) -> anyhow::Result<async_std::process::Child> {
- Ok(cmd.spawn(&self.pty)?)
+ ) -> anyhow::Result<tokio::process::Child> {
+ Ok(cmd.spawn(&self.pts)?)
}
pub async fn close(&self) {
- self.close_w.send(()).await.unwrap();
+ self.close_w.send(()).unwrap();
}
}
async fn pty_task(
- pty: async_std::sync::Arc<pty_process::Pty>,
+ pty: pty_process::Pty,
entry: crate::mutex::Mutex<super::Entry>,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- close_r: async_std::channel::Receiver<()>,
- event_w: async_std::channel::Sender<Event>,
+ input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
+ resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
+ close_r: tokio::sync::mpsc::UnboundedReceiver<()>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) {
- loop {
- enum Res {
- Read(Result<usize, std::io::Error>),
- Write(Result<Vec<u8>, async_std::channel::RecvError>),
- Resize(Result<(u16, u16), async_std::channel::RecvError>),
- Close(Result<(), async_std::channel::RecvError>),
- }
- let mut buf = [0_u8; 4096];
- let read = async { Res::Read((&*pty).read(&mut buf).await) };
- let write = async { Res::Write(input_r.recv().await) };
- let resize = async { Res::Resize(resize_r.recv().await) };
- let close = async { Res::Close(close_r.recv().await) };
- match read.race(write).race(resize).or(close).await {
+ enum Res {
+ Read(Result<bytes::Bytes, std::io::Error>),
+ Write(Vec<u8>),
+ Resize((u16, u16)),
+ Close(()),
+ }
+
+ let (pty_r, mut pty_w) = pty.into_split();
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_util::io::ReaderStream::new(pty_r)
+ .map(Res::Read)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(input_r)
+ .map(Res::Write)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r)
+ .map(Res::Resize)
+ .boxed(),
+ tokio_stream::wrappers::UnboundedReceiverStream::new(close_r)
+ .map(Res::Close)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+ while let Some(res) = stream.next().await {
+ match res {
Res::Read(res) => match res {
Ok(bytes) => {
- entry.lock_arc().await.process(&buf[..bytes]);
- event_w.send(Event::PtyOutput).await.unwrap();
+ entry.clone().lock_owned().await.process(&bytes);
+ event_w.send(Event::PtyOutput).unwrap();
}
Err(e) => {
- if e.raw_os_error() == Some(libc::EIO) {
- continue;
- }
panic!("pty read failed: {:?}", e);
}
},
- Res::Write(res) => match res {
- Ok(bytes) => {
- (&*pty).write(&bytes).await.unwrap();
- }
- Err(e) => {
- panic!("failed to read from input channel: {}", e);
- }
- },
- Res::Resize(res) => match res {
- Ok(size) => {
- pty.resize(pty_process::Size::new(size.0, size.1))
- .unwrap();
- }
- Err(e) => {
- panic!("failed to read from resize channel: {}", e);
- }
- },
- Res::Close(res) => match res {
- Ok(()) => {
- event_w.send(Event::PtyClose).await.unwrap();
- return;
- }
- Err(e) => {
- panic!("failed to read from close channel: {}", e);
- }
- },
+ Res::Write(bytes) => {
+ pty_w.write(&bytes).await.unwrap();
+ }
+ Res::Resize(size) => pty_w
+ .resize(pty_process::Size::new(size.0, size.1))
+ .unwrap(),
+ Res::Close(()) => {
+ event_w.send(Event::PtyClose).unwrap();
+ return;
+ }
}
}
}