summaryrefslogtreecommitdiffstats
path: root/src/shell/history/job.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/shell/history/job.rs')
-rw-r--r--src/shell/history/job.rs152
1 files changed, 78 insertions, 74 deletions
diff --git a/src/shell/history/job.rs b/src/shell/history/job.rs
index 365a06d..d3d112a 100644
--- a/src/shell/history/job.rs
+++ b/src/shell/history/job.rs
@@ -19,7 +19,7 @@ impl Job {
let state = std::sync::Arc::new(std::sync::Mutex::new(
State::Running((0, 0)),
));
- tokio::task::spawn(job_task(
+ tokio::spawn(Self::task(
child,
fh,
std::sync::Arc::clone(&state),
@@ -66,6 +66,83 @@ impl Job {
}
});
}
+
+ async fn task(
+ mut child: tokio::process::Child,
+ fh: std::fs::File,
+ state: std::sync::Arc<std::sync::Mutex<State>>,
+ env: Env,
+ event_w: crate::shell::event::Writer,
+ ) {
+ enum Res {
+ Read(crate::runner::Event),
+ Exit(std::io::Result<std::process::ExitStatus>),
+ }
+
+ let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel();
+ tokio::task::spawn_blocking(move || loop {
+ let event = bincode::deserialize_from(&fh);
+ match event {
+ Ok(event) => {
+ read_w.send(event).unwrap();
+ }
+ Err(e) => {
+ match &*e {
+ bincode::ErrorKind::Io(io_e) => {
+ assert!(
+ io_e.kind()
+ == std::io::ErrorKind::UnexpectedEof
+ );
+ }
+ e => {
+ panic!("{}", e);
+ }
+ }
+ break;
+ }
+ }
+ });
+
+ let mut stream: futures_util::stream::SelectAll<_> = [
+ tokio_stream::wrappers::UnboundedReceiverStream::new(read_r)
+ .map(Res::Read)
+ .boxed(),
+ futures_util::stream::once(child.wait())
+ .map(Res::Exit)
+ .boxed(),
+ ]
+ .into_iter()
+ .collect();
+ let mut exit_status = None;
+ let mut new_env = None;
+ while let Some(res) = stream.next().await {
+ match res {
+ Res::Read(event) => match event {
+ crate::runner::Event::RunPipeline(new_span) => {
+ // we could just update the span in place here, but we
+ // do this as an event so that we can also trigger a
+ // refresh
+ event_w.send(Event::ChildRunPipeline(
+ env.idx(),
+ new_span,
+ ));
+ }
+ crate::runner::Event::Suspend => {
+ event_w.send(Event::ChildSuspend(env.idx()));
+ }
+ crate::runner::Event::Exit(env) => {
+ new_env = Some(env);
+ }
+ },
+ Res::Exit(status) => {
+ exit_status = Some(status.unwrap());
+ }
+ }
+ }
+ *state.lock().unwrap() =
+ State::Exited(ExitInfo::new(exit_status.unwrap()));
+ event_w.send(Event::ChildExit(env.idx(), new_env));
+ }
}
pub enum State {
@@ -108,79 +185,6 @@ impl ExitInfo {
}
}
-async fn job_task(
- mut child: tokio::process::Child,
- fh: std::fs::File,
- state: std::sync::Arc<std::sync::Mutex<State>>,
- env: Env,
- event_w: crate::shell::event::Writer,
-) {
- enum Res {
- Read(crate::runner::Event),
- Exit(std::io::Result<std::process::ExitStatus>),
- }
-
- let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel();
- tokio::task::spawn_blocking(move || loop {
- let event = bincode::deserialize_from(&fh);
- match event {
- Ok(event) => {
- read_w.send(event).unwrap();
- }
- Err(e) => {
- match &*e {
- bincode::ErrorKind::Io(io_e) => {
- assert!(
- io_e.kind() == std::io::ErrorKind::UnexpectedEof
- );
- }
- e => {
- panic!("{}", e);
- }
- }
- break;
- }
- }
- });
-
- let mut stream: futures_util::stream::SelectAll<_> = [
- tokio_stream::wrappers::UnboundedReceiverStream::new(read_r)
- .map(Res::Read)
- .boxed(),
- futures_util::stream::once(child.wait())
- .map(Res::Exit)
- .boxed(),
- ]
- .into_iter()
- .collect();
- let mut exit_status = None;
- let mut new_env = None;
- while let Some(res) = stream.next().await {
- match res {
- Res::Read(event) => match event {
- crate::runner::Event::RunPipeline(new_span) => {
- // we could just update the span in place here, but we do
- // this as an event so that we can also trigger a refresh
- event_w
- .send(Event::ChildRunPipeline(env.idx(), new_span));
- }
- crate::runner::Event::Suspend => {
- event_w.send(Event::ChildSuspend(env.idx()));
- }
- crate::runner::Event::Exit(env) => {
- new_env = Some(env);
- }
- },
- Res::Exit(status) => {
- exit_status = Some(status.unwrap());
- }
- }
- }
- *state.lock().unwrap() =
- State::Exited(ExitInfo::new(exit_status.unwrap()));
- event_w.send(Event::ChildExit(env.idx(), new_env));
-}
-
fn spawn_command(
cmdline: &str,
env: &Env,