diff options
Diffstat (limited to 'src/shell/history/job.rs')
-rw-r--r-- | src/shell/history/job.rs | 152 |
1 files changed, 78 insertions, 74 deletions
diff --git a/src/shell/history/job.rs b/src/shell/history/job.rs index 365a06d..d3d112a 100644 --- a/src/shell/history/job.rs +++ b/src/shell/history/job.rs @@ -19,7 +19,7 @@ impl Job { let state = std::sync::Arc::new(std::sync::Mutex::new( State::Running((0, 0)), )); - tokio::task::spawn(job_task( + tokio::spawn(Self::task( child, fh, std::sync::Arc::clone(&state), @@ -66,6 +66,83 @@ impl Job { } }); } + + async fn task( + mut child: tokio::process::Child, + fh: std::fs::File, + state: std::sync::Arc<std::sync::Mutex<State>>, + env: Env, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result<std::process::ExitStatus>), + } + + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() + == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; + } + } + }); + + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + let mut new_env = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { + crate::runner::Event::RunPipeline(new_span) => { + // we could just update the span in place here, but we + // do this as an event so that we can also trigger a + // refresh + event_w.send(Event::ChildRunPipeline( + env.idx(), + new_span, + )); + } + crate::runner::Event::Suspend => { + event_w.send(Event::ChildSuspend(env.idx())); + } + crate::runner::Event::Exit(env) => { + new_env = Some(env); + } + }, + Res::Exit(status) => { + exit_status = Some(status.unwrap()); + } + } + } + *state.lock().unwrap() = + State::Exited(ExitInfo::new(exit_status.unwrap())); + event_w.send(Event::ChildExit(env.idx(), new_env)); + } } pub enum State { @@ -108,79 +185,6 @@ impl ExitInfo { } } -async fn job_task( - mut child: tokio::process::Child, - fh: std::fs::File, - state: std::sync::Arc<std::sync::Mutex<State>>, - env: Env, - event_w: crate::shell::event::Writer, -) { - enum Res { - Read(crate::runner::Event), - Exit(std::io::Result<std::process::ExitStatus>), - } - - let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); - tokio::task::spawn_blocking(move || loop { - let event = bincode::deserialize_from(&fh); - match event { - Ok(event) => { - read_w.send(event).unwrap(); - } - Err(e) => { - match &*e { - bincode::ErrorKind::Io(io_e) => { - assert!( - io_e.kind() == std::io::ErrorKind::UnexpectedEof - ); - } - e => { - panic!("{}", e); - } - } - break; - } - } - }); - - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) - .map(Res::Read) - .boxed(), - futures_util::stream::once(child.wait()) - .map(Res::Exit) - .boxed(), - ] - .into_iter() - .collect(); - let mut exit_status = None; - let mut new_env = None; - while let Some(res) = stream.next().await { - match res { - Res::Read(event) => match event { - crate::runner::Event::RunPipeline(new_span) => { - // we could just update the span in place here, but we do - // this as an event so that we can also trigger a refresh - event_w - .send(Event::ChildRunPipeline(env.idx(), new_span)); - } - crate::runner::Event::Suspend => { - event_w.send(Event::ChildSuspend(env.idx())); - } - crate::runner::Event::Exit(env) => { - new_env = Some(env); - } - }, - Res::Exit(status) => { - exit_status = Some(status.unwrap()); - } - } - } - *state.lock().unwrap() = - State::Exited(ExitInfo::new(exit_status.unwrap())); - event_w.send(Event::ChildExit(env.idx(), new_env)); -} - fn spawn_command( cmdline: &str, env: &Env, |