diff options
Diffstat (limited to 'src/shell/history/mod.rs')
-rw-r--r-- | src/shell/history/mod.rs | 152 |
1 files changed, 65 insertions, 87 deletions
diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 1bc4e62..2eeab0b 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -67,7 +67,7 @@ impl History { out: &mut impl textmode::Textmode, idx: usize, ) { - let mut entry = self.entries[idx].lock_arc().await; + let mut entry = self.entries[idx].clone().lock_owned().await; entry.render_fullscreen(out); } @@ -78,7 +78,7 @@ impl History { pub async fn resize(&mut self, size: (u16, u16)) { self.size = size; for entry in &self.entries { - entry.lock_arc().await.resize(size).await; + entry.clone().lock_owned().await.resize(size).await; } } @@ -86,10 +86,10 @@ impl History { &mut self, cmdline: &str, env: &Env, - event_w: async_std::channel::Sender<Event>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) -> anyhow::Result<usize> { - let (input_w, input_r) = async_std::channel::unbounded(); - let (resize_w, resize_r) = async_std::channel::unbounded(); + let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel(); + let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel(); let entry = crate::mutex::new(Entry::new( cmdline.to_string(), @@ -112,7 +112,7 @@ impl History { } pub async fn entry(&self, idx: usize) -> crate::mutex::Guard<Entry> { - self.entries[idx].lock_arc().await + self.entries[idx].clone().lock_owned().await } pub fn entry_count(&self) -> usize { @@ -173,7 +173,7 @@ impl History { for (idx, entry) in self.entries.iter().enumerate().rev().skip(self.scroll_pos) { - let entry = entry.lock_arc().await; + let entry = entry.clone().lock_owned().await; let focused = focus.map_or(false, |focus| idx == focus); used_lines += entry.lines(self.entry_count(), focused && !scrolling); @@ -221,13 +221,13 @@ fn run_commands( cmdline: String, entry: crate::mutex::Mutex<Entry>, mut env: Env, - input_r: async_std::channel::Receiver<Vec<u8>>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender<Event>, + input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender<Event>, ) { - async_std::task::spawn(async move { + tokio::task::spawn(async move { let pty = match pty::Pty::new( - entry.lock_arc().await.size(), + entry.clone().lock_owned().await.size(), &entry, input_r, resize_r, @@ -235,14 +235,12 @@ fn run_commands( ) { Ok(pty) => pty, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!("nbsh: failed to allocate pty: {}\r\n", e) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( - 1 << 8, - )); + env.set_status(std::process::ExitStatus::from_raw(1 << 8)); entry.finish(env, event_w).await; return; } @@ -254,7 +252,7 @@ fn run_commands( { Ok(status) => status, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!( "nbsh: failed to spawn {}: {}\r\n", @@ -262,7 +260,7 @@ fn run_commands( ) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( + env.set_status(std::process::ExitStatus::from_raw( 1 << 8, )); entry.finish(env, event_w).await; @@ -271,7 +269,7 @@ fn run_commands( }; env.set_status(status); - entry.lock_arc().await.finish(env, event_w).await; + entry.clone().lock_owned().await.finish(env, event_w).await; pty.close().await; }); } @@ -280,12 +278,19 @@ async fn spawn_commands( cmdline: &str, pty: &pty::Pty, env: &mut Env, - event_w: async_std::channel::Sender<Event>, -) -> anyhow::Result<async_std::process::ExitStatus> { + event_w: tokio::sync::mpsc::UnboundedSender<Event>, +) -> anyhow::Result<std::process::ExitStatus> { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result<std::process::ExitStatus>), + } + let mut cmd = pty_process::Command::new(std::env::current_exe()?); cmd.args(&["-c", cmdline, "--status-fd", "3"]); env.apply(&mut cmd); let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + // Safety: from_r was just opened above and is not used anywhere else + let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; // Safety: dup2 is an async-signal-safe function unsafe { cmd.pre_exec(move || { @@ -293,90 +298,63 @@ async fn spawn_commands( Ok(()) }); } - let child = pty.spawn(cmd)?; + let mut child = pty.spawn(cmd)?; nix::unistd::close(from_w)?; - let (read_w, read_r) = async_std::channel::unbounded(); - let new_read = move || { - let read_w = read_w.clone(); - async_std::task::spawn(async move { - let event = blocking::unblock(move || { - // Safety: from_r was just opened above and is only - // referenced in this closure, which takes ownership of it - // at the start and returns ownership of it at the end - let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; - let event = bincode::deserialize_from(&fh); - let _ = fh.into_raw_fd(); - event - }) - .await; - if read_w.is_closed() { - // we should never drop read_r while there are still valid - // things to read - assert!(event.is_err()); - } else { - read_w.send(event).await.unwrap(); + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; } - }); - }; - - new_read(); - let mut read_done = false; - let mut exit_done = None; - loop { - enum Res { - Read(bincode::Result<crate::runner::Event>), - Exit(std::io::Result<std::process::ExitStatus>), } + }); - let read_r = read_r.clone(); - let read = async move { Res::Read(read_r.recv().await.unwrap()) }; - let exit = async { - Res::Exit(if exit_done.is_none() { - child.status_no_drop().await - } else { - std::future::pending().await - }) - }; - match read.or(exit).await { - Res::Read(Ok(event)) => match event { + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { crate::runner::Event::RunPipeline(idx, span) => { - event_w - .send(Event::ChildRunPipeline(idx, span)) - .await - .unwrap(); - new_read(); + event_w.send(Event::ChildRunPipeline(idx, span)).unwrap(); } crate::runner::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)).await.unwrap(); - new_read(); + event_w.send(Event::ChildSuspend(idx)).unwrap(); } crate::runner::Event::Exit(new_env) => { *env = new_env; - read_done = true; } }, - Res::Read(Err(e)) => { - if let bincode::ErrorKind::Io(io_e) = &*e { - if io_e.kind() == std::io::ErrorKind::UnexpectedEof { - read_done = true; - } else { - anyhow::bail!(e); - } - } else { - anyhow::bail!(e); - } - } Res::Exit(Ok(status)) => { - exit_done = Some(status); + exit_status = Some(status); } Res::Exit(Err(e)) => { anyhow::bail!(e); } } - if let (true, Some(status)) = (read_done, exit_done) { - nix::unistd::close(from_r)?; - return Ok(status); - } } + Ok(exit_status.unwrap()) } |