From a8923b3b9b52c19a4ca1b98319651ea7a05c9aac Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 24 Oct 2019 01:07:40 -0400 Subject: add some tests --- tests/run/mod.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/run/mod.rs (limited to 'tests/run/mod.rs') diff --git a/tests/run/mod.rs b/tests/run/mod.rs new file mode 100644 index 0000000..d64789c --- /dev/null +++ b/tests/run/mod.rs @@ -0,0 +1,41 @@ +#![allow(dead_code)] + +use futures::future::Future as _; + +pub fn future( + fut: impl futures::future::Future + Send + 'static, +) -> Result { + let (wchan, rchan) = std::sync::mpsc::channel(); + run(fut.then(move |res| { + wchan.send(res).unwrap(); + futures::future::ok(()) + })); + rchan.iter().next().unwrap() +} + +pub fn stream( + stream: impl futures::stream::Stream + Send + 'static, +) -> Result, E> { + let (wchan, rchan) = std::sync::mpsc::channel(); + let wchan_ok = wchan.clone(); + let wchan_err = wchan.clone(); + drop(wchan); + run(stream + .for_each(move |i| { + wchan_ok.send(Ok(i)).unwrap(); + futures::future::ok(()) + }) + .map_err(move |e| { + wchan_err.send(Err(e)).unwrap(); + })); + rchan.iter().collect() +} + +// replacement for tokio::run which keeps panics on the main thread (so that +// they don't get swallowed up and ignored if they happen on other threads) +fn run( + fut: impl futures::future::Future + Send + 'static, +) { + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(fut).unwrap() +} -- cgit v1.2.3-54-g00ecf