diff options
Diffstat (limited to 'src/shell/history')
-rw-r--r-- | src/shell/history/job.rs | 152 | ||||
-rw-r--r-- | src/shell/history/pty.rs | 104 |
2 files changed, 130 insertions, 126 deletions
diff --git a/src/shell/history/job.rs b/src/shell/history/job.rs index 365a06d..d3d112a 100644 --- a/src/shell/history/job.rs +++ b/src/shell/history/job.rs @@ -19,7 +19,7 @@ impl Job { let state = std::sync::Arc::new(std::sync::Mutex::new( State::Running((0, 0)), )); - tokio::task::spawn(job_task( + tokio::spawn(Self::task( child, fh, std::sync::Arc::clone(&state), @@ -66,6 +66,83 @@ impl Job { } }); } + + async fn task( + mut child: tokio::process::Child, + fh: std::fs::File, + state: std::sync::Arc<std::sync::Mutex<State>>, + env: Env, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result<std::process::ExitStatus>), + } + + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() + == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; + } + } + }); + + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + let mut new_env = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { + crate::runner::Event::RunPipeline(new_span) => { + // we could just update the span in place here, but we + // do this as an event so that we can also trigger a + // refresh + event_w.send(Event::ChildRunPipeline( + env.idx(), + new_span, + )); + } + crate::runner::Event::Suspend => { + event_w.send(Event::ChildSuspend(env.idx())); + } + crate::runner::Event::Exit(env) => { + new_env = Some(env); + } + }, + Res::Exit(status) => { + exit_status = Some(status.unwrap()); + } + } + } + *state.lock().unwrap() = + State::Exited(ExitInfo::new(exit_status.unwrap())); + event_w.send(Event::ChildExit(env.idx(), new_env)); + } } pub enum State { @@ -108,79 +185,6 @@ impl ExitInfo { } } -async fn job_task( - mut child: tokio::process::Child, - fh: std::fs::File, - state: std::sync::Arc<std::sync::Mutex<State>>, - env: Env, - event_w: crate::shell::event::Writer, -) { - enum Res { - Read(crate::runner::Event), - Exit(std::io::Result<std::process::ExitStatus>), - } - - let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); - tokio::task::spawn_blocking(move || loop { - let event = bincode::deserialize_from(&fh); - match event { - Ok(event) => { - read_w.send(event).unwrap(); - } - Err(e) => { - match &*e { - bincode::ErrorKind::Io(io_e) => { - assert!( - io_e.kind() == std::io::ErrorKind::UnexpectedEof - ); - } - e => { - panic!("{}", e); - } - } - break; - } - } - }); - - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) - .map(Res::Read) - .boxed(), - futures_util::stream::once(child.wait()) - .map(Res::Exit) - .boxed(), - ] - .into_iter() - .collect(); - let mut exit_status = None; - let mut new_env = None; - while let Some(res) = stream.next().await { - match res { - Res::Read(event) => match event { - crate::runner::Event::RunPipeline(new_span) => { - // we could just update the span in place here, but we do - // this as an event so that we can also trigger a refresh - event_w - .send(Event::ChildRunPipeline(env.idx(), new_span)); - } - crate::runner::Event::Suspend => { - event_w.send(Event::ChildSuspend(env.idx())); - } - crate::runner::Event::Exit(env) => { - new_env = Some(env); - } - }, - Res::Exit(status) => { - exit_status = Some(status.unwrap()); - } - } - } - *state.lock().unwrap() = - State::Exited(ExitInfo::new(exit_status.unwrap())); - event_w.send(Event::ChildExit(env.idx(), new_env)); -} - fn spawn_command( cmdline: &str, env: &Env, diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 49681d4..91b9cbb 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -26,7 +26,7 @@ impl Pty { super::pty::Vt::new(size), )); - tokio::task::spawn(pty_task( + tokio::spawn(Self::task( pty, std::sync::Arc::clone(&vt), request_r, @@ -66,6 +66,57 @@ impl Pty { #[allow(clippy::let_underscore_drop)] let _ = self.request_w.send(Request::Resize(size.0, size.1)); } + + async fn task( + pty: pty_process::Pty, + vt: std::sync::Arc<std::sync::Mutex<super::pty::Vt>>, + request_r: tokio::sync::mpsc::UnboundedReceiver<Request>, + event_w: crate::shell::event::Writer, + ) { + enum Res { + Read(Result<bytes::Bytes, std::io::Error>), + Request(Request), + } + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) + .map(Res::Request) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { + Res::Read(res) => match res { + Ok(bytes) => { + vt.lock().unwrap().process(&bytes); + event_w.send(Event::PtyOutput); + } + Err(e) => { + // this means that there are no longer any open pts + // fds. we could alternately signal this through an + // explicit channel at ChildExit time, but this seems + // reliable enough. + if e.raw_os_error() == Some(libc::EIO) { + return; + } + panic!("pty read failed: {:?}", e); + } + }, + Res::Request(Request::Input(bytes)) => { + pty_w.write(&bytes).await.unwrap(); + } + Res::Request(Request::Resize(row, col)) => { + pty_w.resize(pty_process::Size::new(row, col)).unwrap(); + vt.lock().unwrap().set_size((row, col)); + } + } + } + } } pub struct Vt { @@ -161,54 +212,3 @@ impl Vt { last_row } } - -async fn pty_task( - pty: pty_process::Pty, - vt: std::sync::Arc<std::sync::Mutex<super::pty::Vt>>, - request_r: tokio::sync::mpsc::UnboundedReceiver<Request>, - event_w: crate::shell::event::Writer, -) { - enum Res { - Read(Result<bytes::Bytes, std::io::Error>), - Request(Request), - } - - let (pty_r, mut pty_w) = pty.into_split(); - let mut stream: futures_util::stream::SelectAll<_> = [ - tokio_util::io::ReaderStream::new(pty_r) - .map(Res::Read) - .boxed(), - tokio_stream::wrappers::UnboundedReceiverStream::new(request_r) - .map(Res::Request) - .boxed(), - ] - .into_iter() - .collect(); - while let Some(res) = stream.next().await { - match res { - Res::Read(res) => match res { - Ok(bytes) => { - vt.lock().unwrap().process(&bytes); - event_w.send(Event::PtyOutput); - } - Err(e) => { - // this means that there are no longer any open pts fds. - // we could alternately signal this through an explicit - // channel at ChildExit time, but this seems reliable - // enough. - if e.raw_os_error() == Some(libc::EIO) { - return; - } - panic!("pty read failed: {:?}", e); - } - }, - Res::Request(Request::Input(bytes)) => { - pty_w.write(&bytes).await.unwrap(); - } - Res::Request(Request::Resize(row, col)) => { - pty_w.resize(pty_process::Size::new(row, col)).unwrap(); - vt.lock().unwrap().set_size((row, col)); - } - } - } -} |