aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Luehrs <doy@tozt.net>2019-10-24 01:07:40 -0400
committerJesse Luehrs <doy@tozt.net>2019-10-24 01:07:40 -0400
commita8923b3b9b52c19a4ca1b98319651ea7a05c9aac (patch)
tree212401cba3d68c0e86928bac2202e2a9ed6bd597
parent4f2e13a8008e11d9498fcb38f5315b51aa908a66 (diff)
downloadcomponent-future-a8923b3b9b52c19a4ca1b98319651ea7a05c9aac.tar.gz
component-future-a8923b3b9b52c19a4ca1b98319651ea7a05c9aac.zip
add some tests
-rw-r--r--Cargo.toml3
-rw-r--r--tests/basic.rs206
-rw-r--r--tests/errors.rs88
-rw-r--r--tests/processing.rs111
-rw-r--r--tests/run/mod.rs41
5 files changed, 449 insertions, 0 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 0a9f315..ed883db 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,3 +6,6 @@ edition = "2018"
[dependencies]
futures = "0.1"
+
+[dev-dependencies]
+tokio = "0.1"
diff --git a/tests/basic.rs b/tests/basic.rs
new file mode 100644
index 0000000..8cfebcd
--- /dev/null
+++ b/tests/basic.rs
@@ -0,0 +1,206 @@
+extern crate component_future;
+
+mod run;
+
+#[test]
+fn test_basic_future() {
+ struct TwoFutures {
+ fut1: Option<
+ Box<
+ dyn futures::future::Future<Item = u32, Error = String>
+ + Send,
+ >,
+ >,
+ fut2: Option<
+ Box<
+ dyn futures::future::Future<Item = u32, Error = String>
+ + Send,
+ >,
+ >,
+ val: u32,
+ }
+
+ impl TwoFutures {
+ fn new<F1, F2>(fut1: F1, fut2: F2) -> Self
+ where
+ F1: futures::future::Future<Item = u32, Error = String>
+ + Send
+ + 'static,
+ F2: futures::future::Future<Item = u32, Error = String>
+ + Send
+ + 'static,
+ {
+ Self {
+ fut1: Some(Box::new(fut1)),
+ fut2: Some(Box::new(fut2)),
+ val: 1,
+ }
+ }
+ }
+
+ #[allow(clippy::type_complexity)]
+ impl TwoFutures {
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ u32,
+ String,
+ >] = &[
+ &Self::poll_future_1,
+ &Self::poll_future_2,
+ &Self::poll_return,
+ ];
+
+ fn poll_future_1(&mut self) -> component_future::Poll<u32, String> {
+ if let Some(fut1) = &mut self.fut1 {
+ let val = component_future::try_ready!(fut1.poll());
+ self.val += val;
+ self.fut1.take();
+ Ok(component_future::Async::DidWork)
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+
+ fn poll_future_2(&mut self) -> component_future::Poll<u32, String> {
+ if self.fut1.is_some() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ if let Some(fut2) = &mut self.fut2 {
+ let val = component_future::try_ready!(fut2.poll());
+ self.val *= val;
+ self.fut2.take();
+ Ok(component_future::Async::DidWork)
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+
+ fn poll_return(&mut self) -> component_future::Poll<u32, String> {
+ if self.fut1.is_some() || self.fut2.is_some() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ Ok(component_future::Async::Ready(self.val))
+ }
+ }
+
+ impl futures::future::Future for TwoFutures {
+ type Item = u32;
+ type Error = String;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ component_future::poll_future(self, Self::POLL_FNS)
+ }
+ }
+
+ let cfut =
+ TwoFutures::new(futures::future::ok(3), futures::future::ok(5));
+ let i = run::future(cfut);
+ assert_eq!(i, Ok(20));
+}
+
+#[test]
+fn test_basic_stream() {
+ struct TwoFutures {
+ fut1: Option<
+ Box<
+ dyn futures::future::Future<Item = u32, Error = String>
+ + Send,
+ >,
+ >,
+ fut2: Option<
+ Box<
+ dyn futures::future::Future<Item = u32, Error = String>
+ + Send,
+ >,
+ >,
+ }
+
+ impl TwoFutures {
+ fn new<F1, F2>(fut1: F1, fut2: F2) -> Self
+ where
+ F1: futures::future::Future<Item = u32, Error = String>
+ + Send
+ + 'static,
+ F2: futures::future::Future<Item = u32, Error = String>
+ + Send
+ + 'static,
+ {
+ Self {
+ fut1: Some(Box::new(fut1)),
+ fut2: Some(Box::new(fut2)),
+ }
+ }
+ }
+
+ impl TwoFutures {
+ #[allow(clippy::type_complexity)]
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ Option<u32>,
+ String,
+ >] = &[
+ &Self::poll_future_1,
+ &Self::poll_future_2,
+ &Self::poll_return,
+ ];
+
+ fn poll_future_1(
+ &mut self,
+ ) -> component_future::Poll<Option<u32>, String> {
+ if let Some(fut1) = &mut self.fut1 {
+ let val = component_future::try_ready!(fut1.poll());
+ self.fut1.take();
+ Ok(component_future::Async::Ready(Some(val)))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+
+ fn poll_future_2(
+ &mut self,
+ ) -> component_future::Poll<Option<u32>, String> {
+ if self.fut1.is_some() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ if let Some(fut2) = &mut self.fut2 {
+ let val = component_future::try_ready!(fut2.poll());
+ self.fut2.take();
+ Ok(component_future::Async::Ready(Some(val)))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+
+ fn poll_return(
+ &mut self,
+ ) -> component_future::Poll<Option<u32>, String> {
+ if self.fut1.is_some() || self.fut2.is_some() {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ Ok(component_future::Async::Ready(None))
+ }
+ }
+
+ impl futures::stream::Stream for TwoFutures {
+ type Item = u32;
+ type Error = String;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ component_future::poll_stream(self, Self::POLL_FNS)
+ }
+ }
+ let cstream =
+ TwoFutures::new(futures::future::ok(3), futures::future::ok(5));
+ let is = run::stream(cstream);
+ assert_eq!(is, Ok(vec![3, 5]));
+}
diff --git a/tests/errors.rs b/tests/errors.rs
new file mode 100644
index 0000000..646c64d
--- /dev/null
+++ b/tests/errors.rs
@@ -0,0 +1,88 @@
+// this fails because once there are no more events actively being processed,
+// the stream doesn't have anything else to do (so it can't return Ready), but
+// also no underlying future or stream has returned NotReady (so it can't
+// return NotReady), so it has no valid action to take.
+
+mod run;
+
+#[derive(Debug, PartialEq, Eq)]
+struct InputEvent(u32);
+#[derive(Debug, PartialEq, Eq)]
+struct OutputEvent(u32);
+
+impl InputEvent {
+ fn into_output_event(
+ self,
+ ) -> impl futures::future::Future<Item = OutputEvent, Error = String>
+ {
+ let InputEvent(i) = self;
+ futures::future::ok(OutputEvent(i))
+ }
+}
+
+enum State {
+ Waiting,
+ Processing(
+ Box<
+ dyn futures::future::Future<Item = OutputEvent, Error = String>
+ + Send,
+ >,
+ ),
+}
+
+struct IdleStream {
+ state: State,
+}
+
+impl IdleStream {
+ fn new() -> Self {
+ Self {
+ state: State::Waiting,
+ }
+ }
+
+ fn process(&mut self, event: InputEvent) {
+ self.state = State::Processing(Box::new(event.into_output_event()));
+ }
+}
+
+impl IdleStream {
+ #[allow(clippy::type_complexity)]
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ Option<OutputEvent>,
+ String,
+ >] = &[&Self::poll_state];
+
+ fn poll_state(
+ &mut self,
+ ) -> component_future::Poll<Option<OutputEvent>, String> {
+ if let State::Processing(fut) = &mut self.state {
+ let output_event = component_future::try_ready!(fut.poll());
+ self.state = State::Waiting;
+ Ok(component_future::Async::Ready(Some(output_event)))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+}
+
+impl futures::stream::Stream for IdleStream {
+ type Item = OutputEvent;
+ type Error = String;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ component_future::poll_stream(self, Self::POLL_FNS)
+ }
+}
+
+#[test]
+#[should_panic]
+fn test_panic() {
+ let mut stream = IdleStream::new();
+ stream.process(InputEvent(1));
+ let _ = run::stream(stream);
+}
diff --git a/tests/processing.rs b/tests/processing.rs
new file mode 100644
index 0000000..4446dc6
--- /dev/null
+++ b/tests/processing.rs
@@ -0,0 +1,111 @@
+mod run;
+
+#[derive(Debug, PartialEq, Eq)]
+struct InputEvent(u32);
+#[derive(Debug, PartialEq, Eq)]
+struct OutputEvent(u32);
+
+impl InputEvent {
+ fn into_output_event(
+ self,
+ ) -> impl futures::future::Future<Item = OutputEvent, Error = String>
+ {
+ let InputEvent(i) = self;
+ futures::future::ok(OutputEvent(i))
+ }
+}
+
+enum State {
+ Reading,
+ Processing(
+ Box<
+ dyn futures::future::Future<Item = OutputEvent, Error = String>
+ + Send,
+ >,
+ ),
+}
+
+struct Stream {
+ input: Box<
+ dyn futures::stream::Stream<Item = InputEvent, Error = String> + Send,
+ >,
+ state: State,
+}
+
+impl Stream {
+ fn new(
+ input: Box<
+ dyn futures::stream::Stream<Item = InputEvent, Error = String>
+ + Send,
+ >,
+ ) -> Self {
+ Self {
+ input,
+ state: State::Reading,
+ }
+ }
+}
+
+impl Stream {
+ #[allow(clippy::type_complexity)]
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ Option<OutputEvent>,
+ String,
+ >] = &[&Self::poll_input, &Self::poll_state];
+
+ fn poll_input(
+ &mut self,
+ ) -> component_future::Poll<Option<OutputEvent>, String> {
+ if let State::Processing(..) = self.state {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ if let Some(input_event) =
+ component_future::try_ready!(self.input.poll())
+ {
+ self.state =
+ State::Processing(Box::new(input_event.into_output_event()));
+ Ok(component_future::Async::DidWork)
+ } else {
+ Ok(component_future::Async::Ready(None))
+ }
+ }
+
+ fn poll_state(
+ &mut self,
+ ) -> component_future::Poll<Option<OutputEvent>, String> {
+ if let State::Processing(fut) = &mut self.state {
+ let output_event = component_future::try_ready!(fut.poll());
+ self.state = State::Reading;
+ Ok(component_future::Async::Ready(Some(output_event)))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+}
+
+impl futures::stream::Stream for Stream {
+ type Item = OutputEvent;
+ type Error = String;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ component_future::poll_stream(self, Self::POLL_FNS)
+ }
+}
+
+#[test]
+fn test_processing() {
+ let vals = vec![5, 3, 1, 35];
+ let stream = Stream::new(Box::new(futures::stream::iter_ok(
+ vals.clone().into_iter().map(InputEvent),
+ )));
+ let events = run::stream(stream);
+ assert_eq!(
+ events,
+ Ok(vals.clone().into_iter().map(OutputEvent).collect())
+ )
+}
diff --git a/tests/run/mod.rs b/tests/run/mod.rs
new file mode 100644
index 0000000..d64789c
--- /dev/null
+++ b/tests/run/mod.rs
@@ -0,0 +1,41 @@
+#![allow(dead_code)]
+
+use futures::future::Future as _;
+
+pub fn future<T: Send + 'static, E: Send + 'static>(
+ fut: impl futures::future::Future<Item = T, Error = E> + Send + 'static,
+) -> Result<T, E> {
+ let (wchan, rchan) = std::sync::mpsc::channel();
+ run(fut.then(move |res| {
+ wchan.send(res).unwrap();
+ futures::future::ok(())
+ }));
+ rchan.iter().next().unwrap()
+}
+
+pub fn stream<T: Send + 'static, E: Send + 'static>(
+ stream: impl futures::stream::Stream<Item = T, Error = E> + Send + 'static,
+) -> Result<Vec<T>, E> {
+ let (wchan, rchan) = std::sync::mpsc::channel();
+ let wchan_ok = wchan.clone();
+ let wchan_err = wchan.clone();
+ drop(wchan);
+ run(stream
+ .for_each(move |i| {
+ wchan_ok.send(Ok(i)).unwrap();
+ futures::future::ok(())
+ })
+ .map_err(move |e| {
+ wchan_err.send(Err(e)).unwrap();
+ }));
+ rchan.iter().collect()
+}
+
+// replacement for tokio::run which keeps panics on the main thread (so that
+// they don't get swallowed up and ignored if they happen on other threads)
+fn run(
+ fut: impl futures::future::Future<Item = (), Error = ()> + Send + 'static,
+) {
+ let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
+ runtime.block_on(fut).unwrap()
+}