summaryrefslogtreecommitdiffstats
path: root/src/shell/history/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/shell/history/mod.rs')
-rw-r--r--src/shell/history/mod.rs152
1 files changed, 65 insertions, 87 deletions
diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs
index 1bc4e62..2eeab0b 100644
--- a/src/shell/history/mod.rs
+++ b/src/shell/history/mod.rs
@@ -67,7 +67,7 @@ impl History {
out: &mut impl textmode::Textmode,
idx: usize,
) {
- let mut entry = self.entries[idx].lock_arc().await;
+ let mut entry = self.entries[idx].clone().lock_owned().await;
entry.render_fullscreen(out);
}
@@ -78,7 +78,7 @@ impl History {
pub async fn resize(&mut self, size: (u16, u16)) {
self.size = size;
for entry in &self.entries {
- entry.lock_arc().await.resize(size).await;
+ entry.clone().lock_owned().await.resize(size).await;
}
}
@@ -86,10 +86,10 @@ impl History {
&mut self,
cmdline: &str,
env: &Env,
- event_w: async_std::channel::Sender<Event>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) -> anyhow::Result<usize> {
- let (input_w, input_r) = async_std::channel::unbounded();
- let (resize_w, resize_r) = async_std::channel::unbounded();
+ let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel();
+ let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel();
let entry = crate::mutex::new(Entry::new(
cmdline.to_string(),
@@ -112,7 +112,7 @@ impl History {
}
pub async fn entry(&self, idx: usize) -> crate::mutex::Guard<Entry> {
- self.entries[idx].lock_arc().await
+ self.entries[idx].clone().lock_owned().await
}
pub fn entry_count(&self) -> usize {
@@ -173,7 +173,7 @@ impl History {
for (idx, entry) in
self.entries.iter().enumerate().rev().skip(self.scroll_pos)
{
- let entry = entry.lock_arc().await;
+ let entry = entry.clone().lock_owned().await;
let focused = focus.map_or(false, |focus| idx == focus);
used_lines +=
entry.lines(self.entry_count(), focused && !scrolling);
@@ -221,13 +221,13 @@ fn run_commands(
cmdline: String,
entry: crate::mutex::Mutex<Entry>,
mut env: Env,
- input_r: async_std::channel::Receiver<Vec<u8>>,
- resize_r: async_std::channel::Receiver<(u16, u16)>,
- event_w: async_std::channel::Sender<Event>,
+ input_r: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
+ resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>,
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
) {
- async_std::task::spawn(async move {
+ tokio::task::spawn(async move {
let pty = match pty::Pty::new(
- entry.lock_arc().await.size(),
+ entry.clone().lock_owned().await.size(),
&entry,
input_r,
resize_r,
@@ -235,14 +235,12 @@ fn run_commands(
) {
Ok(pty) => pty,
Err(e) => {
- let mut entry = entry.lock_arc().await;
+ let mut entry = entry.clone().lock_owned().await;
entry.process(
format!("nbsh: failed to allocate pty: {}\r\n", e)
.as_bytes(),
);
- env.set_status(async_std::process::ExitStatus::from_raw(
- 1 << 8,
- ));
+ env.set_status(std::process::ExitStatus::from_raw(1 << 8));
entry.finish(env, event_w).await;
return;
}
@@ -254,7 +252,7 @@ fn run_commands(
{
Ok(status) => status,
Err(e) => {
- let mut entry = entry.lock_arc().await;
+ let mut entry = entry.clone().lock_owned().await;
entry.process(
format!(
"nbsh: failed to spawn {}: {}\r\n",
@@ -262,7 +260,7 @@ fn run_commands(
)
.as_bytes(),
);
- env.set_status(async_std::process::ExitStatus::from_raw(
+ env.set_status(std::process::ExitStatus::from_raw(
1 << 8,
));
entry.finish(env, event_w).await;
@@ -271,7 +269,7 @@ fn run_commands(
};
env.set_status(status);
- entry.lock_arc().await.finish(env, event_w).await;
+ entry.clone().lock_owned().await.finish(env, event_w).await;
pty.close().await;
});
}
@@ -280,12 +278,19 @@ async fn spawn_commands(
cmdline: &str,
pty: &pty::Pty,
env: &mut Env,
- event_w: async_std::channel::Sender<Event>,
-) -> anyhow::Result<async_std::process::ExitStatus> {
+ event_w: tokio::sync::mpsc::UnboundedSender<Event>,
+) -> anyhow::Result<std::process::ExitStatus> {
+ enum Res {
+ Read(crate::runner::Event),
+ Exit(std::io::Result<std::process::ExitStatus>),
+ }
+
let mut cmd = pty_process::Command::new(std::env::current_exe()?);
cmd.args(&["-c", cmdline, "--status-fd", "3"]);
env.apply(&mut cmd);
let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
+ // Safety: from_r was just opened above and is not used anywhere else
+ let fh = unsafe { std::fs::File::from_raw_fd(from_r) };
// Safety: dup2 is an async-signal-safe function
unsafe {
cmd.pre_exec(move || {
@@ -293,90 +298,63 @@ async fn spawn_commands(
Ok(())
});
}
- let child = pty.spawn(cmd)?;
+ let mut child = pty.spawn(cmd)?;
nix::unistd::close(from_w)?;
- let (read_w, read_r) = async_std::channel::unbounded();
- let new_read = move || {
- let read_w = read_w.clone();
- async_std::task::spawn(async move {
- let event = blocking::unblock(move || {
- // Safety: from_r was just opened above and is only
- // referenced in this closure, which takes ownership of it
- // at the start and returns ownership of it at the end
- let fh = unsafe { std::fs::File::from_raw_fd(from_r) };
- let event = bincode::deserialize_from(&fh);
- let _ = fh.into_raw_fd();
- event
- })
- .await;
- if read_w.is_closed() {
- // we should never drop read_r while there are still valid
- // things to read
- assert!(event.is_err());
- } else {
- read_w.send(event).await.unwrap();
+ let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel();
+ tokio::task::spawn_blocking(move || loop {
+ let event = bincode::deserialize_from(&fh);
+ match event {
+ Ok(event) => {
+ read_w.send(event).unwrap();
+ }
+ Err(e) => {
+ match &*e {
+ bincode::ErrorKind::Io(io_e) => {
+ assert!(
+ io_e.kind() == std::io::ErrorKind::UnexpectedEof
+ );
+ }
+ e => {
+ panic!("{}", e);
+ }
+ }
+ break;
}
- });
- };
-
- new_read();
- let mut read_done = false;
- let mut exit_done = None;
- loop {
- enum Res {
- Read(bincode::Result<crate::runner::Event>),
- Exit(std::io::Result<std::process::ExitStatus>),
}
+ });
- let read_r = read_r.clone();
- let read = async move { Res::Read(read_r.recv().await.unwrap()) };
- let exit = async {
- Res::Exit(if exit_done.is_none() {
- child.status_no_drop().await
- } else {
- std::future::pending().await
- })
- };
- match read.or(exit).await {
- Res::Read(Ok(event)) => match event {
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_stream::wrappers::UnboundedReceiverStream::new(read_r)
+ .map(Res::Read)
+ .boxed(),
+ futures_util::stream::once(child.wait())
+ .map(Res::Exit)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+ let mut exit_status = None;
+ while let Some(res) = stream.next().await {
+ match res {
+ Res::Read(event) => match event {
crate::runner::Event::RunPipeline(idx, span) => {
- event_w
- .send(Event::ChildRunPipeline(idx, span))
- .await
- .unwrap();
- new_read();
+ event_w.send(Event::ChildRunPipeline(idx, span)).unwrap();
}
crate::runner::Event::Suspend(idx) => {
- event_w.send(Event::ChildSuspend(idx)).await.unwrap();
- new_read();
+ event_w.send(Event::ChildSuspend(idx)).unwrap();
}
crate::runner::Event::Exit(new_env) => {
*env = new_env;
- read_done = true;
}
},
- Res::Read(Err(e)) => {
- if let bincode::ErrorKind::Io(io_e) = &*e {
- if io_e.kind() == std::io::ErrorKind::UnexpectedEof {
- read_done = true;
- } else {
- anyhow::bail!(e);
- }
- } else {
- anyhow::bail!(e);
- }
- }
Res::Exit(Ok(status)) => {
- exit_done = Some(status);
+ exit_status = Some(status);
}
Res::Exit(Err(e)) => {
anyhow::bail!(e);
}
}
- if let (true, Some(status)) = (read_done, exit_done) {
- nix::unistd::close(from_r)?;
- return Ok(status);
- }
}
+ Ok(exit_status.unwrap())
}