From 39287b07f87aba15c4cb0f64d7008ba67289151d Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Tue, 22 Feb 2022 17:02:12 -0500 Subject: another rewrite --- tests/basic.rs | 50 ++++---- tests/behavior.rs | 315 +++++++++++++++++++++++++++++---------------------- tests/fds.rs | 8 +- tests/fds_async.rs | 61 +++++++--- tests/helpers/mod.rs | 23 ++-- tests/pipe.rs | 131 +++++++++++++-------- tests/split.rs | 155 +++++++++++++++++++++++++ tests/winch.rs | 41 ++++--- 8 files changed, 532 insertions(+), 252 deletions(-) create mode 100644 tests/split.rs (limited to 'tests') diff --git a/tests/basic.rs b/tests/basic.rs index cab2c16..6f276e6 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -4,42 +4,50 @@ mod helpers; fn test_cat_blocking() { use std::io::Write as _; - let pty = pty_process::blocking::Pty::new().unwrap(); + let mut pty = pty_process::blocking::Pty::new().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("cat") - .spawn(&pty) + .spawn(&pty.pts().unwrap()) .unwrap(); - (&pty).write_all(b"foo\n").unwrap(); + pty.write_all(b"foo\n").unwrap(); let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "foo\r\n"); assert_eq!(output.next().unwrap(), "foo\r\n"); - (&pty).write_all(&[4u8]).unwrap(); + pty.write_all(&[4u8]).unwrap(); let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); } #[cfg(feature = "async")] #[test] -fn test_cat_async_std() { - use async_std::io::prelude::WriteExt as _; +fn test_cat_async() { use futures::stream::StreamExt as _; - - let status = async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("cat").spawn(&pty).unwrap(); - - (&pty).write_all(b"foo\n").await.unwrap(); - - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); - - (&pty).write_all(&[4u8]).await.unwrap(); - child.status().await.unwrap() - }); + use tokio::io::AsyncWriteExt as _; + + let status = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut child = + pty_process::Command::new("cat").spawn(&pts).unwrap(); + + let (pty_r, mut pty_w) = pty.split(); + + pty_w.write_all(b"foo\n").await.unwrap(); + + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + + pty_w.write_all(&[4u8]).await.unwrap(); + child.wait().await.unwrap() + }); assert_eq!(status.code().unwrap(), 0); } diff --git a/tests/behavior.rs b/tests/behavior.rs index d45dd14..7969f72 100644 --- a/tests/behavior.rs +++ b/tests/behavior.rs @@ -3,11 +3,12 @@ mod helpers; #[test] fn test_multiple() { let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("echo") .arg("foo") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -18,10 +19,9 @@ fn test_multiple() { let mut child = pty_process::blocking::Command::new("echo") .arg("bar") - .spawn(&pty) + .spawn(&pts) .unwrap(); - let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "bar\r\n"); let status = child.wait().unwrap(); @@ -33,32 +33,37 @@ fn test_multiple() { fn test_multiple_async() { use futures::stream::StreamExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("echo") - .arg("foo") - .spawn(&pty) - .unwrap(); + let mut child = pty_process::Command::new("echo") + .arg("foo") + .spawn(&pts) + .unwrap(); + let (pty_r, _) = pty.split(); - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); - let mut child = pty_process::Command::new("echo") - .arg("bar") - .spawn(&pty) - .unwrap(); + let mut child = pty_process::Command::new("echo") + .arg("bar") + .spawn(&pts) + .unwrap(); - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "bar\r\n"); + assert_eq!(output.next().await.unwrap(), "bar\r\n"); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); + }); } #[test] @@ -67,6 +72,7 @@ fn test_multiple_configured() { use std::os::unix::io::FromRawFd as _; let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let (stderr_pipe_r, stderr_pipe_w) = nix::unistd::pipe().unwrap(); @@ -90,7 +96,7 @@ fn test_multiple_configured() { Ok(()) }); } - let mut child = cmd.spawn(&pty).unwrap(); + let mut child = cmd.spawn(&pts).unwrap(); let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "foo\r\n"); @@ -110,7 +116,8 @@ fn test_multiple_configured() { let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); - let mut child = cmd.spawn(&pty).unwrap(); + let mut child = cmd.spawn(&pts).unwrap(); + let mut output = helpers::output(&pty); assert_eq!(output.next().unwrap(), "foo\r\n"); @@ -133,105 +140,126 @@ fn test_multiple_configured() { #[cfg(feature = "async")] #[test] fn test_multiple_configured_async() { - use async_std::io::prelude::BufReadExt as _; use futures::stream::StreamExt as _; use std::os::unix::io::FromRawFd as _; + use tokio::io::AsyncBufReadExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - - let (stderr_pipe_r, stderr_pipe_w) = nix::unistd::pipe().unwrap(); - let mut stderr_pipe_r = async_std::io::BufReader::new(unsafe { - async_std::fs::File::from_raw_fd(stderr_pipe_r) - }); - let (pre_exec_pipe_r, pre_exec_pipe_w) = nix::unistd::pipe().unwrap(); - let mut pre_exec_pipe_r = async_std::io::BufReader::new(unsafe { - async_std::fs::File::from_raw_fd(pre_exec_pipe_r) - }); - let mut cmd = pty_process::Command::new("perl"); - cmd.arg("-Esay 'foo'; say STDERR 'foo-stderr'; open my $fh, '>&=3'; say $fh 'foo-3';") - .stderr(unsafe { std::process::Stdio::from_raw_fd(stderr_pipe_w) }); - unsafe { - cmd.pre_exec(move || { - nix::unistd::dup2(pre_exec_pipe_w, 3)?; - nix::fcntl::fcntl( - 3, - nix::fcntl::F_SETFD(nix::fcntl::FdFlag::empty()), - )?; - Ok(()) - }); - } - let mut child = cmd.spawn(&pty).unwrap(); - - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); - - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - stderr_pipe_r.read_until(b'\n', &mut buf), - ) - .await - .unwrap() - .unwrap(); - assert_eq!( - std::string::String::from_utf8(buf).unwrap(), - "foo-stderr\n" - ); - - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - pre_exec_pipe_r.read_until(b'\n', &mut buf), - ) - .await + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() .unwrap() - .unwrap(); - assert_eq!(std::string::String::from_utf8(buf).unwrap(), "foo-3\n"); + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let (pty_r, _) = pty.split(); + + let (stderr_pipe_r, stderr_pipe_w) = nix::unistd::pipe().unwrap(); + let mut stderr_pipe_r = tokio::io::BufReader::new(unsafe { + tokio::fs::File::from_raw_fd(stderr_pipe_r) + }); + let (pre_exec_pipe_r, pre_exec_pipe_w) = + nix::unistd::pipe().unwrap(); + let mut pre_exec_pipe_r = tokio::io::BufReader::new(unsafe { + tokio::fs::File::from_raw_fd(pre_exec_pipe_r) + }); + let mut cmd = pty_process::Command::new("perl"); + cmd.arg( + "-Esay 'foo'; \ + say STDERR 'foo-stderr'; \ + open my $fh, '>&=3'; \ + say $fh 'foo-3';", + ) + .stderr(unsafe { + std::process::Stdio::from_raw_fd(stderr_pipe_w) + }); + unsafe { + cmd.pre_exec(move || { + nix::unistd::dup2(pre_exec_pipe_w, 3)?; + nix::fcntl::fcntl( + 3, + nix::fcntl::F_SETFD(nix::fcntl::FdFlag::empty()), + )?; + Ok(()) + }); + } + let mut child = cmd.spawn(&pts).unwrap(); + + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + stderr_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-stderr\n" + ); + + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + pre_exec_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-3\n" + ); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); - let mut child = cmd.spawn(&pty).unwrap(); + let mut child = cmd.spawn(&pts).unwrap(); - assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - stderr_pipe_r.read_until(b'\n', &mut buf), - ) - .await - .unwrap() - .unwrap(); - assert_eq!( - std::string::String::from_utf8(buf).unwrap(), - "foo-stderr\n" - ); - - let mut buf = vec![]; - async_std::future::timeout( - std::time::Duration::from_secs(5), - pre_exec_pipe_r.read_until(b'\n', &mut buf), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(std::string::String::from_utf8(buf).unwrap(), "foo-3\n"); + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + stderr_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-stderr\n" + ); + + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + pre_exec_pipe_r.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + std::string::String::from_utf8(buf).unwrap(), + "foo-3\n" + ); - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); + }); } #[test] fn test_controlling_terminal() { let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .arg("-Eopen my $fh, '<', '/dev/tty' or die; if (-t $fh) { say 'true' } else { say 'false' }") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -246,29 +274,39 @@ fn test_controlling_terminal() { fn test_controlling_terminal_async() { use futures::stream::StreamExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("perl") - .arg("-Eopen my $fh, '<', '/dev/tty' or die; if (-t $fh) { say 'true' } else { say 'false' }") - .spawn(&pty) - .unwrap(); - - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "true\r\n"); - - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let (pty_r, _) = pty.split(); + let mut child = pty_process::Command::new("perl") + .arg( + "-Eopen my $fh, '<', '/dev/tty' or die; \ + if (-t $fh) { say 'true' } else { say 'false' }", + ) + .spawn(&pts) + .unwrap(); + + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "true\r\n"); + + let status = child.wait().await.unwrap(); + assert_eq!(status.code().unwrap(), 0); + }); } #[test] fn test_session_leader() { let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("python") .arg("-cimport os; print(os.getpid() == os.getsid(0))") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -283,20 +321,25 @@ fn test_session_leader() { fn test_session_leader_async() { use futures::stream::StreamExt as _; - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("python") - .arg("-cimport os; print(os.getpid() == os.getsid(0))") - .spawn(&pty) - .unwrap(); - - { - let mut output = helpers::output_async(&pty); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut child = pty_process::Command::new("python") + .arg("-cimport os; print(os.getpid() == os.getsid(0))") + .spawn(&pts) + .unwrap(); + + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "True\r\n"); - } - let status = child.status().await.unwrap(); - assert_eq!(status.code().unwrap(), 0); - }); + let status = child.wait().await.unwrap(); + eprintln!("{:?}", status); + assert_eq!(status.code().unwrap(), 0); + }); } diff --git a/tests/fds.rs b/tests/fds.rs index 8b64d3f..a641f46 100644 --- a/tests/fds.rs +++ b/tests/fds.rs @@ -5,10 +5,11 @@ fn test_fds() { check_open_fds(); let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -17,14 +18,16 @@ fn test_fds() { let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); drop(pty); + drop(pts); check_open_fds(); let pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") .stderr(std::process::Stdio::null()) - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -33,6 +36,7 @@ fn test_fds() { let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); drop(pty); + drop(pts); check_open_fds(); } diff --git a/tests/fds_async.rs b/tests/fds_async.rs index 9e0bc15..6f3bcde 100644 --- a/tests/fds_async.rs +++ b/tests/fds_async.rs @@ -7,61 +7,86 @@ fn test_fds_async() { check_open_fds(&[0, 1, 2]); - // run once to ensure all of the fds in the async_std machinery are + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + // run once to ensure all of the fds in the tokio machinery are // allocated - async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); + rt.block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::Command::new("perl") - .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") - .spawn(&pty) + .arg( + "-Efor my $fd (0..255) { \ + open my $fh, \"<&=$fd\"; \ + print $fd if stat $fh \ + }; \ + say", + ) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "012\r\n"); - let status = child.status().await.unwrap(); + let status = child.wait().await.unwrap(); assert_eq!(status.code().unwrap(), 0); }); - async_std::task::block_on(async { + rt.block_on(async { let fds = get_open_fds(); - let pty = pty_process::Pty::new().unwrap(); + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::Command::new("perl") - .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") - .spawn(&pty) + .arg( + "-Efor my $fd (0..255) { \ + open my $fh, \"<&=$fd\"; \ + print $fd if stat $fh \ + }; \ + say", + ) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "012\r\n"); - let status = child.status().await.unwrap(); + let status = child.wait().await.unwrap(); assert_eq!(status.code().unwrap(), 0); drop(output); + drop(pts); drop(pty); check_open_fds(&fds); }); - async_std::task::block_on(async { + rt.block_on(async { let fds = get_open_fds(); - let pty = pty_process::Pty::new().unwrap(); + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::Command::new("perl") .arg("-Efor my $fd (0..255) { open my $fh, \"<&=$fd\"; print $fd if stat $fh }; say") .stderr(std::process::Stdio::null()) - .spawn(&pty) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); assert_eq!(output.next().await.unwrap(), "012\r\n"); - let status = child.status().await.unwrap(); + let status = child.wait().await.unwrap(); assert_eq!(status.code().unwrap(), 0); drop(output); + drop(pts); drop(pty); check_open_fds(&fds); diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index 4fee8df..a46a12b 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -31,23 +31,20 @@ pub fn output(pty: &pty_process::blocking::Pty) -> Output<'_> { } #[cfg(feature = "async")] -pub fn output_async( - pty: &pty_process::Pty, -) -> std::pin::Pin + '_>> { - use async_std::io::prelude::BufReadExt as _; +pub fn output_async<'a>( + pty: impl tokio::io::AsyncRead + std::marker::Unpin + 'a, +) -> std::pin::Pin + 'a>> { use futures::FutureExt as _; + use tokio::io::AsyncBufReadExt as _; - let pty = async_std::io::BufReader::new(pty); + let pty = tokio::io::BufReader::new(pty); Box::pin(futures::stream::unfold(pty, |mut pty| async move { Some(( - async_std::future::timeout( - std::time::Duration::from_secs(5), - async { - let mut buf = vec![]; - pty.read_until(b'\n', &mut buf).await.unwrap(); - std::string::String::from_utf8(buf).unwrap() - }, - ) + tokio::time::timeout(std::time::Duration::from_secs(5), async { + let mut buf = vec![]; + pty.read_until(b'\n', &mut buf).await.unwrap(); + std::string::String::from_utf8(buf).unwrap() + }) .map(|x| x.unwrap()) .await, pty, diff --git a/tests/pipe.rs b/tests/pipe.rs index 1bf49f2..e09334b 100644 --- a/tests/pipe.rs +++ b/tests/pipe.rs @@ -2,7 +2,8 @@ fn test_pipe_basic() { use std::os::unix::io::FromRawFd as _; - let (read_fd, write_fd) = nix::unistd::pipe().unwrap(); + let (read_fd, write_fd) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); let mut child_from = std::process::Command::new("seq"); child_from.args(["1", "10"]); @@ -13,56 +14,96 @@ fn test_pipe_basic() { child_to.stdout(std::process::Stdio::piped()); assert!(child_from.status().unwrap().success()); - nix::unistd::close(write_fd).unwrap(); + drop(child_from); let output = child_to.output().unwrap(); assert!(output.status.success()); assert_eq!(output.stdout, b"10\n9\n8\n7\n6\n5\n4\n3\n2\n1\n"); } -#[cfg(feature = "todo")] -// TODO (hangs because i'm still overriding the configured fds) -// #[test] +#[test] +fn test_pipe_blocking() { + use std::io::Read as _; + use std::os::unix::io::FromRawFd as _; + + let (read_fd, write_fd) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); + + let pty_from = pty_process::blocking::Pty::new().unwrap(); + let pts_from = pty_from.pts().unwrap(); + pty_from.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd_from = pty_process::blocking::Command::new("seq"); + cmd_from.args(["1", "10"]); + cmd_from.stdout(unsafe { std::process::Stdio::from_raw_fd(write_fd) }); + let mut child_from = cmd_from.spawn(&pts_from).unwrap(); + + let mut pty_to = pty_process::blocking::Pty::new().unwrap(); + let pts_to = pty_to.pts().unwrap(); + let mut cmd_to = pty_process::blocking::Command::new("tac"); + cmd_to.stdin(unsafe { std::process::Stdio::from_raw_fd(read_fd) }); + let mut child_to = cmd_to.spawn(&pts_to).unwrap(); + + assert!(child_from.wait().unwrap().success()); + drop(cmd_from); + + // wait for the `tac` process to finish generating output (we don't really + // have a good way to detect when that happens) + std::thread::sleep(std::time::Duration::from_millis(100)); + + let mut buf = [0u8; 1024]; + let bytes = pty_to.read(&mut buf).unwrap(); + assert_eq!( + &buf[..bytes], + b"10\r\n9\r\n8\r\n7\r\n6\r\n5\r\n4\r\n3\r\n2\r\n1\r\n" + ); + + assert!(child_to.wait().unwrap().success()); +} + +#[cfg(feature = "async")] +#[test] fn test_pipe_async() { - use async_std::io::ReadExt as _; use std::os::unix::io::FromRawFd as _; + use tokio::io::AsyncReadExt as _; + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let (read_fd, write_fd) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap(); + + let pty_from = pty_process::Pty::new().unwrap(); + let pts_from = pty_from.pts().unwrap(); + pty_from.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd_from = pty_process::Command::new("seq"); + cmd_from.args(["1", "10"]); + cmd_from.stdout(unsafe { + std::process::Stdio::from_raw_fd(write_fd) + }); + let mut child_from = cmd_from.spawn(&pts_from).unwrap(); + + let mut pty_to = pty_process::Pty::new().unwrap(); + let pts_to = pty_to.pts().unwrap(); + let mut cmd_to = pty_process::Command::new("tac"); + cmd_to + .stdin(unsafe { std::process::Stdio::from_raw_fd(read_fd) }); + let mut child_to = cmd_to.spawn(&pts_to).unwrap(); + + assert!(child_from.wait().await.unwrap().success()); + drop(cmd_from); + + // wait for the `tac` process to finish generating output (we + // don't really have a good way to detect when that happens) + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let mut buf = [0u8; 1024]; + let bytes = pty_to.read(&mut buf).await.unwrap(); + assert_eq!( + &buf[..bytes], + b"10\r\n9\r\n8\r\n7\r\n6\r\n5\r\n4\r\n3\r\n2\r\n1\r\n" + ); - let (status_from, status_to) = async_std::task::block_on(async { - let (read_fd, write_fd) = nix::unistd::pipe().unwrap(); - - let pty_from = pty_process::async_std::Pty::new().unwrap(); - pty_from.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut cmd_from = pty_process::async_std::Command::new("seq"); - cmd_from.args(["1", "10"]); - cmd_from - .stdout(unsafe { std::process::Stdio::from_raw_fd(write_fd) }); - let mut child_from = cmd_from.spawn(pty_from).unwrap(); - - let pty_to = pty_process::async_std::Pty::new().unwrap(); - pty_to.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut cmd_to = pty_process::async_std::Command::new("tac"); - cmd_to.stdin(unsafe { std::process::Stdio::from_raw_fd(read_fd) }); - let mut child_to = cmd_to.spawn(pty_to).unwrap(); - - // the pty will echo the written bytes back immediately, but the - // subprocess needs to generate its own output, which takes time, so - // we can't just read immediately (we may just get the echoed bytes). - // because the output generation is happening in the subprocess, we - // also don't have any way to know when (or if!) the subprocess will - // decide to send its output, so sleeping is the best we can do. - async_std::task::sleep(std::time::Duration::from_secs(1)).await; - - let mut buf = [0u8; 1024]; - let bytes = child_to.pty().read(&mut buf).await.unwrap(); - assert_eq!( - &buf[..bytes], - b"10\r\n9\r\n8\r\n7\r\n6\r\n5\r\n4\r\n3\r\n2\r\n1\r\n" - ); - - ( - child_from.status().await.unwrap(), - child_to.status().await.unwrap(), - ) - }); - assert_eq!(status_from.code().unwrap(), 0); - assert_eq!(status_to.code().unwrap(), 0); + assert!(child_to.wait().await.unwrap().success()); + }); } diff --git a/tests/split.rs b/tests/split.rs new file mode 100644 index 0000000..f537f17 --- /dev/null +++ b/tests/split.rs @@ -0,0 +1,155 @@ +mod helpers; + +#[cfg(feature = "async")] +#[test] +fn test_split() { + use futures::stream::StreamExt as _; + use tokio::io::AsyncWriteExt as _; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd = pty_process::Command::new("perl"); + cmd.args(["-plE", "BEGIN { $SIG{WINCH} = sub { say 'WINCH' } }"]); + let mut child = cmd.spawn(&pts).unwrap(); + + { + pty.write_all(b"foo\n").await.unwrap(); + let (pty_r, _) = pty.split(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + } + + { + let (pty_r, mut pty_w) = pty.split(); + pty_w.write_all(b"foo\n").await.unwrap(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + assert_eq!(output.next().await.unwrap(), "foo\r\n"); + } + + { + let (pty_r, pty_w) = pty.split(); + pty_w.resize(pty_process::Size::new(25, 80)).unwrap(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "WINCH\r\n"); + } + + pty.write_all(&[4u8]).await.unwrap(); + child.wait().await.unwrap() + }); +} + +#[cfg(feature = "async")] +#[test] +fn test_into_split() { + use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _}; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut cmd = pty_process::Command::new("perl"); + cmd.args(["-plE", "BEGIN { $SIG{WINCH} = sub { say 'WINCH' } }"]); + let mut child = cmd.spawn(&pts).unwrap(); + + { + pty.write_all(b"foo\n").await.unwrap(); + let (pty_r, pty_w) = pty.into_split(); + let mut ptybuf = tokio::io::BufReader::new(pty_r); + for _ in 0..2 { + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + ptybuf.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(&buf[..], b"foo\r\n"); + } + pty = ptybuf.into_inner().unsplit(pty_w).unwrap(); + } + + { + let (pty_r, mut pty_w) = pty.into_split(); + pty_w.write_all(b"foo\n").await.unwrap(); + let mut ptybuf = tokio::io::BufReader::new(pty_r); + for _ in 0..2 { + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + ptybuf.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(&buf[..], b"foo\r\n"); + } + pty = ptybuf.into_inner().unsplit(pty_w).unwrap(); + } + + { + let (pty_r, pty_w) = pty.into_split(); + pty_w.resize(pty_process::Size::new(25, 80)).unwrap(); + let mut ptybuf = tokio::io::BufReader::new(pty_r); + let mut buf = vec![]; + tokio::time::timeout( + std::time::Duration::from_secs(5), + ptybuf.read_until(b'\n', &mut buf), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(&buf[..], b"WINCH\r\n"); + pty = ptybuf.into_inner().unsplit(pty_w).unwrap(); + } + + pty.write_all(&[4u8]).await.unwrap(); + child.wait().await.unwrap() + }); +} + +#[cfg(feature = "async")] +#[test] +fn test_into_split_error() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let pty1 = pty_process::Pty::new().unwrap(); + let pty2 = pty_process::Pty::new().unwrap(); + + let (pty1_r, pty1_w) = pty1.into_split(); + let (pty2_r, pty2_w) = pty2.into_split(); + + let (pty1_r, pty2_w) = if let Err(pty_process::Error::Unsplit(r, w)) = + pty1_r.unsplit(pty2_w) + { + (r, w) + } else { + panic!("fail"); + }; + let (pty2_r, pty1_w) = if let Err(pty_process::Error::Unsplit(r, w)) = + pty2_r.unsplit(pty1_w) + { + (r, w) + } else { + panic!("fail"); + }; + + let _pty1 = pty1_r.unsplit(pty1_w).unwrap(); + let _pty2 = pty2_r.unsplit(pty2_w).unwrap(); + }); +} diff --git a/tests/winch.rs b/tests/winch.rs index 50447cb..6b16ca9 100644 --- a/tests/winch.rs +++ b/tests/winch.rs @@ -4,14 +4,15 @@ mod helpers; fn test_winch_std() { use std::io::Write as _; - let pty = pty_process::blocking::Pty::new().unwrap(); + let mut pty = pty_process::blocking::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); pty.resize(pty_process::Size::new(24, 80)).unwrap(); let mut child = pty_process::blocking::Command::new("perl") .args(&[ "-E", "$|++; $SIG{WINCH} = sub { say 'WINCH' }; say 'started'; <>", ]) - .spawn(&pty) + .spawn(&pts) .unwrap(); let mut output = helpers::output(&pty); @@ -20,7 +21,7 @@ fn test_winch_std() { pty.resize(pty_process::Size::new(25, 80)).unwrap(); assert_eq!(output.next().unwrap(), "WINCH\r\n"); - (&pty).write_all(b"\n").unwrap(); + pty.write_all(b"\n").unwrap(); let status = child.wait().unwrap(); assert_eq!(status.code().unwrap(), 0); } @@ -28,28 +29,34 @@ fn test_winch_std() { #[cfg(feature = "async")] #[test] fn test_winch_async() { - use async_std::io::prelude::WriteExt as _; use futures::stream::StreamExt as _; - - let status = async_std::task::block_on(async { - let pty = pty_process::Pty::new().unwrap(); - pty.resize(pty_process::Size::new(24, 80)).unwrap(); - let mut child = pty_process::Command::new("perl") + use tokio::io::AsyncWriteExt as _; + + let status = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut pty = pty_process::Pty::new().unwrap(); + let pts = pty.pts().unwrap(); + pty.resize(pty_process::Size::new(24, 80)).unwrap(); + let mut child = pty_process::Command::new("perl") .args(&[ "-E", "$|++; $SIG{WINCH} = sub { say 'WINCH' }; say 'started'; <>", ]) - .spawn(&pty) + .spawn(&pts) .unwrap(); - let mut output = helpers::output_async(&pty); - assert_eq!(output.next().await.unwrap(), "started\r\n"); + let (pty_r, mut pty_w) = pty.split(); + let mut output = helpers::output_async(pty_r); + assert_eq!(output.next().await.unwrap(), "started\r\n"); - pty.resize(pty_process::Size::new(25, 80)).unwrap(); - assert_eq!(output.next().await.unwrap(), "WINCH\r\n"); + pty_w.resize(pty_process::Size::new(25, 80)).unwrap(); + assert_eq!(output.next().await.unwrap(), "WINCH\r\n"); - (&pty).write_all(b"\n").await.unwrap(); - child.status().await.unwrap() - }); + pty_w.write_all(b"\n").await.unwrap(); + child.wait().await.unwrap() + }); assert_eq!(status.code().unwrap(), 0); } -- cgit v1.2.3-54-g00ecf