diff options
author | Jesse Luehrs <doy@tozt.net> | 2022-02-25 17:32:58 -0500 |
---|---|---|
committer | Jesse Luehrs <doy@tozt.net> | 2022-02-25 17:32:58 -0500 |
commit | a2462bbaea13f7a3f3eb65e7430b30618bc203b8 (patch) | |
tree | 8cb2eab6904c2c9f4138f8221caa947c6b805d3b /src/shell/history | |
parent | 7b66471194490a1421001fd51d073cc6d18848ea (diff) | |
download | nbsh-a2462bbaea13f7a3f3eb65e7430b30618bc203b8.tar.gz nbsh-a2462bbaea13f7a3f3eb65e7430b30618bc203b8.zip |
move to tokio
Diffstat (limited to 'src/shell/history')
-rw-r--r-- | src/shell/history/entry.rs | 20 | ||||
-rw-r--r-- | src/shell/history/mod.rs | 152 | ||||
-rw-r--r-- | src/shell/history/pty.rs | 118 |
3 files changed, 131 insertions, 159 deletions
diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs index a45d99d..97e8a7b 100644 --- a/src/shell/history/entry.rs +++ b/src/shell/history/entry.rs @@ -16,8 +16,8 @@ pub struct Entry { visual_bell: bool, real_bell_pending: bool, fullscreen: Option<bool>, - input: async_std::channel::Sender<Vec<u8>>, - resize: async_std::channel::Sender<(u16, u16)>, + input: tokio::sync::mpsc::UnboundedSender<Vec<u8>>, + resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, start_time: time::OffsetDateTime, start_instant: std::time::Instant, } @@ -27,8 +27,8 @@ impl Entry { cmdline: String, env: Env, size: (u16, u16), - input: async_std::channel::Sender<Vec<u8>>, - resize: async_std::channel::Sender<(u16, u16)>, + input: tokio::sync::mpsc::UnboundedSender<Vec<u8>>, + resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, ) -> Self { let span = (0, cmdline.len()); Self { @@ -229,13 +229,13 @@ impl Entry { pub async fn send_input(&self, bytes: Vec<u8>) { if self.running() { - self.input.send(bytes).await.unwrap(); + self.input.send(bytes).unwrap(); } } pub async fn resize(&mut self, size: (u16, u16)) { if self.running() { - self.resize.send(size).await.unwrap(); + self.resize.send(size).unwrap(); self.vt.set_size(size.0, size.1); } } @@ -341,11 +341,11 @@ impl Entry { pub async fn finish( &mut self, env: Env, - event_w: async_std::channel::Sender<Event>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) { self.state = State::Exited(ExitInfo::new(env.latest_status())); self.env = env; - event_w.send(Event::PtyClose).await.unwrap(); + event_w.send(Event::PtyClose).unwrap(); } fn exit_info(&self) -> Option<&ExitInfo> { @@ -369,12 +369,12 @@ impl Entry { } struct ExitInfo { - status: async_std::process::ExitStatus, + status: std::process::ExitStatus, instant: std::time::Instant, } impl ExitInfo { - fn new(status: async_std::process::ExitStatus) -> Self { + fn new(status: std::process::ExitStatus) -> Self { Self { status, instant: std::time::Instant::now(), diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 1bc4e62..2eeab0b 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -67,7 +67,7 @@ impl History { out: &mut impl textmode::Textmode, idx: usize, ) { - let mut entry = self.entries[idx].lock_arc().await; + let mut entry = self.entries[idx].clone().lock_owned().await; entry.render_fullscreen(out); } @@ -78,7 +78,7 @@ impl History { pub async fn resize(&mut self, size: (u16, u16)) { self.size = size; for entry in &self.entries { - entry.lock_arc().await.resize(size).await; + entry.clone().lock_owned().await.resize(size).await; } } @@ -86,10 +86,10 @@ impl History { &mut self, cmdline: &str, env: &Env, - event_w: async_std::channel::Sender<Event>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) -> anyhow::Result<usize> { - let (input_w, input_r) = async_std::channel::unbounded(); - let (resize_w, resize_r) = async_std::channel::unbounded(); + let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel(); + let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel(); let entry = crate::mutex::new(Entry::new( cmdline.to_string(), @@ -112,7 +112,7 @@ impl History { } pub async fn entry(&self, idx: usize) -> crate::mutex::Guard<Entry> { - self.entries[idx].lock_arc().await + self.entries[idx].clone().lock_owned().await } pub fn entry_count(&self) -> usize { @@ -173,7 +173,7 @@ impl History { for (idx, entry) in self.entries.iter().enumerate().rev().skip(self.scroll_pos) { - let entry = entry.lock_arc().await; + let entry = entry.clone().lock_owned().await; let focused = focus.map_or(false, |focus| idx == focus); used_lines += entry.lines(self.entry_count(), focused && !scrolling); @@ -221,13 +221,13 @@ fn run_commands( cmdline: String, entry: crate::mutex::Mutex<Entry>, mut env: Env, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<Event>, + input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) { - async_std::task::spawn(async move { + tokio::task::spawn(async move { let pty = match pty::Pty::new( - entry.lock_arc().await.size(), + entry.clone().lock_owned().await.size(), &entry, input_r, resize_r, @@ -235,14 +235,12 @@ fn run_commands( ) { Ok(pty) => pty, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!("nbsh: failed to allocate pty: {}\r\n", e) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( - 1 << 8, - )); + env.set_status(std::process::ExitStatus::from_raw(1 << 8)); entry.finish(env, event_w).await; return; } @@ -254,7 +252,7 @@ fn run_commands( { Ok(status) => status, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!( "nbsh: failed to spawn {}: {}\r\n", @@ -262,7 +260,7 @@ fn run_commands( ) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( + env.set_status(std::process::ExitStatus::from_raw( 1 << 8, )); entry.finish(env, event_w).await; @@ -271,7 +269,7 @@ fn run_commands( }; env.set_status(status); - entry.lock_arc().await.finish(env, event_w).await; + entry.clone().lock_owned().await.finish(env, event_w).await; pty.close().await; }); } @@ -280,12 +278,19 @@ async fn spawn_commands( cmdline: &str, pty: &pty::Pty, env: &mut Env, - event_w: async_std::channel::Sender<Event>, -) -> anyhow::Result<async_std::process::ExitStatus> { + event_w: tokio::sync::mpsc::UnboundedSender<Event>, +) -> anyhow::Result<std::process::ExitStatus> { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result<std::process::ExitStatus>), + } + let mut cmd = pty_process::Command::new(std::env::current_exe()?); cmd.args(&["-c", cmdline, "--status-fd", "3"]); env.apply(&mut cmd); let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + // Safety: from_r was just opened above and is not used anywhere else + let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; // Safety: dup2 is an async-signal-safe function unsafe { cmd.pre_exec(move || { @@ -293,90 +298,63 @@ async fn spawn_commands( Ok(()) }); } - let child = pty.spawn(cmd)?; + let mut child = pty.spawn(cmd)?; nix::unistd::close(from_w)?; - let (read_w, read_r) = async_std::channel::unbounded(); - let new_read = move || { - let read_w = read_w.clone(); - async_std::task::spawn(async move { - let event = blocking::unblock(move || { - // Safety: from_r was just opened above and is only - // referenced in this closure, which takes ownership of it - // at the start and returns ownership of it at the end - let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; - let event = bincode::deserialize_from(&fh); - let _ = fh.into_raw_fd(); - event - }) - .await; - if read_w.is_closed() { - // we should never drop read_r while there are still valid - // things to read - assert!(event.is_err()); - } else { - read_w.send(event).await.unwrap(); + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; } - }); - }; - - new_read(); - let mut read_done = false; - let mut exit_done = None; - loop { - enum Res { - Read(bincode::Result<crate::runner::Event>), - Exit(std::io::Result<std::process::ExitStatus>), } + }); - let read_r = read_r.clone(); - let read = async move { Res::Read(read_r.recv().await.unwrap()) }; - let exit = async { - Res::Exit(if exit_done.is_none() { - child.status_no_drop().await - } else { - std::future::pending().await - }) - }; - match read.or(exit).await { - Res::Read(Ok(event)) => match event { + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { crate::runner::Event::RunPipeline(idx, span) => { - event_w - .send(Event::ChildRunPipeline(idx, span)) - .await - .unwrap(); - new_read(); + event_w.send(Event::ChildRunPipeline(idx, span)).unwrap(); } crate::runner::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)).await.unwrap(); - new_read(); + event_w.send(Event::ChildSuspend(idx)).unwrap(); } crate::runner::Event::Exit(new_env) => { *env = new_env; - read_done = true; } }, - Res::Read(Err(e)) => { - if let bincode::ErrorKind::Io(io_e) = &*e { - if io_e.kind() == std::io::ErrorKind::UnexpectedEof { - read_done = true; - } else { - anyhow::bail!(e); - } - } else { - anyhow::bail!(e); - } - } Res::Exit(Ok(status)) => { - exit_done = Some(status); + exit_status = Some(status); } Res::Exit(Err(e)) => { anyhow::bail!(e); } } - if let (true, Some(status)) = (read_done, exit_done) { - nix::unistd::close(from_r)?; - return Ok(status); - } } + Ok(exit_status.unwrap()) } diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 5a51e73..acfe500 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -1,26 +1,26 @@ use crate::shell::prelude::*; pub struct Pty { - pty: async_std::sync::Arc<pty_process::Pty>, - close_w: async_std::channel::Sender<()>, + pts: pty_process::Pts, + close_w: tokio::sync::mpsc::UnboundedSender<()>, } impl Pty { pub fn new( size: (u16, u16), entry: &crate::mutex::Mutex<super::Entry>, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<Event>, + input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) -> anyhow::Result<Self> { - let (close_w, close_r) = async_std::channel::unbounded(); + let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel(); let pty = pty_process::Pty::new()?; pty.resize(pty_process::Size::new(size.0, size.1))?; - let pty = async_std::sync::Arc::new(pty); + let pts = pty.pts()?; - async_std::task::spawn(pty_task( - async_std::sync::Arc::clone(&pty), + tokio::task::spawn(pty_task( + pty, crate::mutex::clone(entry), input_r, resize_r, @@ -28,80 +28,74 @@ impl Pty { event_w, )); - Ok(Self { pty, close_w }) + Ok(Self { pts, close_w }) } pub fn spawn( &self, mut cmd: pty_process::Command, - ) -> anyhow::Result<async_std::process::Child> { - Ok(cmd.spawn(&self.pty)?) + ) -> anyhow::Result<tokio::process::Child> { + Ok(cmd.spawn(&self.pts)?) } pub async fn close(&self) { - self.close_w.send(()).await.unwrap(); + self.close_w.send(()).unwrap(); } } async fn pty_task( - pty: async_std::sync::Arc<pty_process::Pty>, + pty: pty_process::Pty, entry: crate::mutex::Mutex<super::Entry>, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - close_r: async_std::channel::Receiver<()>, - event_w: async_std::channel::Sender<Event>, + input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + close_r: tokio::sync::mpsc::UnboundedReceiver<()>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) { - loop { - enum Res { - Read(Result<usize, std::io::Error>), - Write(Result<Vec<u8>, async_std::channel::RecvError>), - Resize(Result<(u16, u16), async_std::channel::RecvError>), - Close(Result<(), async_std::channel::RecvError>), - } - let mut buf = [0_u8; 4096]; - let read = async { Res::Read((&*pty).read(&mut buf).await) }; - let write = async { Res::Write(input_r.recv().await) }; - let resize = async { Res::Resize(resize_r.recv().await) }; - let close = async { Res::Close(close_r.recv().await) }; - match read.race(write).race(resize).or(close).await { + enum Res { + Read(Result<bytes::Bytes, std::io::Error>), + Write(Vec<u8>), + Resize((u16, u16)), + Close(()), + } + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(input_r) + .map(Res::Write) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r) + .map(Res::Resize) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(close_r) + .map(Res::Close) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { Res::Read(res) => match res { Ok(bytes) => { - entry.lock_arc().await.process(&buf[..bytes]); - event_w.send(Event::PtyOutput).await.unwrap(); + entry.clone().lock_owned().await.process(&bytes); + event_w.send(Event::PtyOutput).unwrap(); } Err(e) => { - if e.raw_os_error() == Some(libc::EIO) { - continue; - } panic!("pty read failed: {:?}", e); } }, - Res::Write(res) => match res { - Ok(bytes) => { - (&*pty).write(&bytes).await.unwrap(); - } - Err(e) => { - panic!("failed to read from input channel: {}", e); - } - }, - Res::Resize(res) => match res { - Ok(size) => { - pty.resize(pty_process::Size::new(size.0, size.1)) - .unwrap(); - } - Err(e) => { - panic!("failed to read from resize channel: {}", e); - } - }, - Res::Close(res) => match res { - Ok(()) => { - event_w.send(Event::PtyClose).await.unwrap(); - return; - } - Err(e) => { - panic!("failed to read from close channel: {}", e); - } - }, + Res::Write(bytes) => { + pty_w.write(&bytes).await.unwrap(); + } + Res::Resize(size) => pty_w + .resize(pty_process::Size::new(size.0, size.1)) + .unwrap(), + Res::Close(()) => { + event_w.send(Event::PtyClose).unwrap(); + return; + } } } } |