aboutsummaryrefslogtreecommitdiffstats
path: root/tests/processing.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/processing.rs')
-rw-r--r--tests/processing.rs111
1 files changed, 111 insertions, 0 deletions
diff --git a/tests/processing.rs b/tests/processing.rs
new file mode 100644
index 0000000..4446dc6
--- /dev/null
+++ b/tests/processing.rs
@@ -0,0 +1,111 @@
+mod run;
+
+#[derive(Debug, PartialEq, Eq)]
+struct InputEvent(u32);
+#[derive(Debug, PartialEq, Eq)]
+struct OutputEvent(u32);
+
+impl InputEvent {
+ fn into_output_event(
+ self,
+ ) -> impl futures::future::Future<Item = OutputEvent, Error = String>
+ {
+ let InputEvent(i) = self;
+ futures::future::ok(OutputEvent(i))
+ }
+}
+
+enum State {
+ Reading,
+ Processing(
+ Box<
+ dyn futures::future::Future<Item = OutputEvent, Error = String>
+ + Send,
+ >,
+ ),
+}
+
+struct Stream {
+ input: Box<
+ dyn futures::stream::Stream<Item = InputEvent, Error = String> + Send,
+ >,
+ state: State,
+}
+
+impl Stream {
+ fn new(
+ input: Box<
+ dyn futures::stream::Stream<Item = InputEvent, Error = String>
+ + Send,
+ >,
+ ) -> Self {
+ Self {
+ input,
+ state: State::Reading,
+ }
+ }
+}
+
+impl Stream {
+ #[allow(clippy::type_complexity)]
+ const POLL_FNS:
+ &'static [&'static dyn for<'a> Fn(
+ &'a mut Self,
+ )
+ -> component_future::Poll<
+ Option<OutputEvent>,
+ String,
+ >] = &[&Self::poll_input, &Self::poll_state];
+
+ fn poll_input(
+ &mut self,
+ ) -> component_future::Poll<Option<OutputEvent>, String> {
+ if let State::Processing(..) = self.state {
+ return Ok(component_future::Async::NothingToDo);
+ }
+
+ if let Some(input_event) =
+ component_future::try_ready!(self.input.poll())
+ {
+ self.state =
+ State::Processing(Box::new(input_event.into_output_event()));
+ Ok(component_future::Async::DidWork)
+ } else {
+ Ok(component_future::Async::Ready(None))
+ }
+ }
+
+ fn poll_state(
+ &mut self,
+ ) -> component_future::Poll<Option<OutputEvent>, String> {
+ if let State::Processing(fut) = &mut self.state {
+ let output_event = component_future::try_ready!(fut.poll());
+ self.state = State::Reading;
+ Ok(component_future::Async::Ready(Some(output_event)))
+ } else {
+ Ok(component_future::Async::NothingToDo)
+ }
+ }
+}
+
+impl futures::stream::Stream for Stream {
+ type Item = OutputEvent;
+ type Error = String;
+
+ fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
+ component_future::poll_stream(self, Self::POLL_FNS)
+ }
+}
+
+#[test]
+fn test_processing() {
+ let vals = vec![5, 3, 1, 35];
+ let stream = Stream::new(Box::new(futures::stream::iter_ok(
+ vals.clone().into_iter().map(InputEvent),
+ )));
+ let events = run::stream(stream);
+ assert_eq!(
+ events,
+ Ok(vals.clone().into_iter().map(OutputEvent).collect())
+ )
+}