From a8923b3b9b52c19a4ca1b98319651ea7a05c9aac Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Thu, 24 Oct 2019 01:07:40 -0400 Subject: add some tests --- Cargo.toml | 3 + tests/basic.rs | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++++ tests/errors.rs | 88 ++++++++++++++++++++++ tests/processing.rs | 111 ++++++++++++++++++++++++++++ tests/run/mod.rs | 41 +++++++++++ 5 files changed, 449 insertions(+) create mode 100644 tests/basic.rs create mode 100644 tests/errors.rs create mode 100644 tests/processing.rs create mode 100644 tests/run/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 0a9f315..ed883db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,6 @@ edition = "2018" [dependencies] futures = "0.1" + +[dev-dependencies] +tokio = "0.1" diff --git a/tests/basic.rs b/tests/basic.rs new file mode 100644 index 0000000..8cfebcd --- /dev/null +++ b/tests/basic.rs @@ -0,0 +1,206 @@ +extern crate component_future; + +mod run; + +#[test] +fn test_basic_future() { + struct TwoFutures { + fut1: Option< + Box< + dyn futures::future::Future + + Send, + >, + >, + fut2: Option< + Box< + dyn futures::future::Future + + Send, + >, + >, + val: u32, + } + + impl TwoFutures { + fn new(fut1: F1, fut2: F2) -> Self + where + F1: futures::future::Future + + Send + + 'static, + F2: futures::future::Future + + Send + + 'static, + { + Self { + fut1: Some(Box::new(fut1)), + fut2: Some(Box::new(fut2)), + val: 1, + } + } + } + + #[allow(clippy::type_complexity)] + impl TwoFutures { + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + u32, + String, + >] = &[ + &Self::poll_future_1, + &Self::poll_future_2, + &Self::poll_return, + ]; + + fn poll_future_1(&mut self) -> component_future::Poll { + if let Some(fut1) = &mut self.fut1 { + let val = component_future::try_ready!(fut1.poll()); + self.val += val; + self.fut1.take(); + Ok(component_future::Async::DidWork) + } else { + Ok(component_future::Async::NothingToDo) + } + } + + fn poll_future_2(&mut self) -> component_future::Poll { + if self.fut1.is_some() { + return Ok(component_future::Async::NothingToDo); + } + + if let Some(fut2) = &mut self.fut2 { + let val = component_future::try_ready!(fut2.poll()); + self.val *= val; + self.fut2.take(); + Ok(component_future::Async::DidWork) + } else { + Ok(component_future::Async::NothingToDo) + } + } + + fn poll_return(&mut self) -> component_future::Poll { + if self.fut1.is_some() || self.fut2.is_some() { + return Ok(component_future::Async::NothingToDo); + } + + Ok(component_future::Async::Ready(self.val)) + } + } + + impl futures::future::Future for TwoFutures { + type Item = u32; + type Error = String; + + fn poll(&mut self) -> futures::Poll { + component_future::poll_future(self, Self::POLL_FNS) + } + } + + let cfut = + TwoFutures::new(futures::future::ok(3), futures::future::ok(5)); + let i = run::future(cfut); + assert_eq!(i, Ok(20)); +} + +#[test] +fn test_basic_stream() { + struct TwoFutures { + fut1: Option< + Box< + dyn futures::future::Future + + Send, + >, + >, + fut2: Option< + Box< + dyn futures::future::Future + + Send, + >, + >, + } + + impl TwoFutures { + fn new(fut1: F1, fut2: F2) -> Self + where + F1: futures::future::Future + + Send + + 'static, + F2: futures::future::Future + + Send + + 'static, + { + Self { + fut1: Some(Box::new(fut1)), + fut2: Some(Box::new(fut2)), + } + } + } + + impl TwoFutures { + #[allow(clippy::type_complexity)] + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + Option, + String, + >] = &[ + &Self::poll_future_1, + &Self::poll_future_2, + &Self::poll_return, + ]; + + fn poll_future_1( + &mut self, + ) -> component_future::Poll, String> { + if let Some(fut1) = &mut self.fut1 { + let val = component_future::try_ready!(fut1.poll()); + self.fut1.take(); + Ok(component_future::Async::Ready(Some(val))) + } else { + Ok(component_future::Async::NothingToDo) + } + } + + fn poll_future_2( + &mut self, + ) -> component_future::Poll, String> { + if self.fut1.is_some() { + return Ok(component_future::Async::NothingToDo); + } + + if let Some(fut2) = &mut self.fut2 { + let val = component_future::try_ready!(fut2.poll()); + self.fut2.take(); + Ok(component_future::Async::Ready(Some(val))) + } else { + Ok(component_future::Async::NothingToDo) + } + } + + fn poll_return( + &mut self, + ) -> component_future::Poll, String> { + if self.fut1.is_some() || self.fut2.is_some() { + return Ok(component_future::Async::NothingToDo); + } + + Ok(component_future::Async::Ready(None)) + } + } + + impl futures::stream::Stream for TwoFutures { + type Item = u32; + type Error = String; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + component_future::poll_stream(self, Self::POLL_FNS) + } + } + let cstream = + TwoFutures::new(futures::future::ok(3), futures::future::ok(5)); + let is = run::stream(cstream); + assert_eq!(is, Ok(vec![3, 5])); +} diff --git a/tests/errors.rs b/tests/errors.rs new file mode 100644 index 0000000..646c64d --- /dev/null +++ b/tests/errors.rs @@ -0,0 +1,88 @@ +// this fails because once there are no more events actively being processed, +// the stream doesn't have anything else to do (so it can't return Ready), but +// also no underlying future or stream has returned NotReady (so it can't +// return NotReady), so it has no valid action to take. + +mod run; + +#[derive(Debug, PartialEq, Eq)] +struct InputEvent(u32); +#[derive(Debug, PartialEq, Eq)] +struct OutputEvent(u32); + +impl InputEvent { + fn into_output_event( + self, + ) -> impl futures::future::Future + { + let InputEvent(i) = self; + futures::future::ok(OutputEvent(i)) + } +} + +enum State { + Waiting, + Processing( + Box< + dyn futures::future::Future + + Send, + >, + ), +} + +struct IdleStream { + state: State, +} + +impl IdleStream { + fn new() -> Self { + Self { + state: State::Waiting, + } + } + + fn process(&mut self, event: InputEvent) { + self.state = State::Processing(Box::new(event.into_output_event())); + } +} + +impl IdleStream { + #[allow(clippy::type_complexity)] + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + Option, + String, + >] = &[&Self::poll_state]; + + fn poll_state( + &mut self, + ) -> component_future::Poll, String> { + if let State::Processing(fut) = &mut self.state { + let output_event = component_future::try_ready!(fut.poll()); + self.state = State::Waiting; + Ok(component_future::Async::Ready(Some(output_event))) + } else { + Ok(component_future::Async::NothingToDo) + } + } +} + +impl futures::stream::Stream for IdleStream { + type Item = OutputEvent; + type Error = String; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + component_future::poll_stream(self, Self::POLL_FNS) + } +} + +#[test] +#[should_panic] +fn test_panic() { + let mut stream = IdleStream::new(); + stream.process(InputEvent(1)); + let _ = run::stream(stream); +} diff --git a/tests/processing.rs b/tests/processing.rs new file mode 100644 index 0000000..4446dc6 --- /dev/null +++ b/tests/processing.rs @@ -0,0 +1,111 @@ +mod run; + +#[derive(Debug, PartialEq, Eq)] +struct InputEvent(u32); +#[derive(Debug, PartialEq, Eq)] +struct OutputEvent(u32); + +impl InputEvent { + fn into_output_event( + self, + ) -> impl futures::future::Future + { + let InputEvent(i) = self; + futures::future::ok(OutputEvent(i)) + } +} + +enum State { + Reading, + Processing( + Box< + dyn futures::future::Future + + Send, + >, + ), +} + +struct Stream { + input: Box< + dyn futures::stream::Stream + Send, + >, + state: State, +} + +impl Stream { + fn new( + input: Box< + dyn futures::stream::Stream + + Send, + >, + ) -> Self { + Self { + input, + state: State::Reading, + } + } +} + +impl Stream { + #[allow(clippy::type_complexity)] + const POLL_FNS: + &'static [&'static dyn for<'a> Fn( + &'a mut Self, + ) + -> component_future::Poll< + Option, + String, + >] = &[&Self::poll_input, &Self::poll_state]; + + fn poll_input( + &mut self, + ) -> component_future::Poll, String> { + if let State::Processing(..) = self.state { + return Ok(component_future::Async::NothingToDo); + } + + if let Some(input_event) = + component_future::try_ready!(self.input.poll()) + { + self.state = + State::Processing(Box::new(input_event.into_output_event())); + Ok(component_future::Async::DidWork) + } else { + Ok(component_future::Async::Ready(None)) + } + } + + fn poll_state( + &mut self, + ) -> component_future::Poll, String> { + if let State::Processing(fut) = &mut self.state { + let output_event = component_future::try_ready!(fut.poll()); + self.state = State::Reading; + Ok(component_future::Async::Ready(Some(output_event))) + } else { + Ok(component_future::Async::NothingToDo) + } + } +} + +impl futures::stream::Stream for Stream { + type Item = OutputEvent; + type Error = String; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + component_future::poll_stream(self, Self::POLL_FNS) + } +} + +#[test] +fn test_processing() { + let vals = vec![5, 3, 1, 35]; + let stream = Stream::new(Box::new(futures::stream::iter_ok( + vals.clone().into_iter().map(InputEvent), + ))); + let events = run::stream(stream); + assert_eq!( + events, + Ok(vals.clone().into_iter().map(OutputEvent).collect()) + ) +} diff --git a/tests/run/mod.rs b/tests/run/mod.rs new file mode 100644 index 0000000..d64789c --- /dev/null +++ b/tests/run/mod.rs @@ -0,0 +1,41 @@ +#![allow(dead_code)] + +use futures::future::Future as _; + +pub fn future( + fut: impl futures::future::Future + Send + 'static, +) -> Result { + let (wchan, rchan) = std::sync::mpsc::channel(); + run(fut.then(move |res| { + wchan.send(res).unwrap(); + futures::future::ok(()) + })); + rchan.iter().next().unwrap() +} + +pub fn stream( + stream: impl futures::stream::Stream + Send + 'static, +) -> Result, E> { + let (wchan, rchan) = std::sync::mpsc::channel(); + let wchan_ok = wchan.clone(); + let wchan_err = wchan.clone(); + drop(wchan); + run(stream + .for_each(move |i| { + wchan_ok.send(Ok(i)).unwrap(); + futures::future::ok(()) + }) + .map_err(move |e| { + wchan_err.send(Err(e)).unwrap(); + })); + rchan.iter().collect() +} + +// replacement for tokio::run which keeps panics on the main thread (so that +// they don't get swallowed up and ignored if they happen on other threads) +fn run( + fut: impl futures::future::Future + Send + 'static, +) { + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(fut).unwrap() +} -- cgit v1.2.3