//! This crate implements the inner future protocol documented in [the tokio //! docs](https://tokio.rs/docs/futures/getting_asynchronous/). //! //! # Overview //! //! If you are implementing a complicated future or stream which contains many //! inner futures and streams, it can be difficult to keep track of when //! looping is necessary, when it is safe to return `NotReady`, etc. This //! provides an interface similar to the existing `poll` interface for futures //! and streams, but extended to include additional state about how the inner //! future or stream affected the outer future or stream's state. This allows //! you to easily split your `poll` implementation into multiple methods, and //! ensure that they interact properly. //! //! # Synopsis //! //! ``` //! enum OutputEvent { //! // ... //! } //! //! struct Server { //! // ... //! # some_future: Box< //! # dyn futures::future::Future //! # + Send, //! # >, //! # other_future: Option< //! # Box< //! # dyn futures::future::Future //! # + Send, //! # >, //! # >, //! } //! //! impl Server { //! fn process_thing(&self, thing: OutputEvent) { //! // ... //! } //! //! fn process_other_thing(&self, thing: OutputEvent) -> OutputEvent { //! // ... //! # unimplemented!() //! } //! } //! //! impl Server { //! const POLL_FNS: //! &'static [&'static dyn for<'a> Fn( //! &'a mut Self, //! ) //! -> component_future::Poll< //! Option, //! String, //! >] = &[&Self::poll_thing, &Self::poll_other_thing]; //! //! fn poll_thing( //! &mut self, //! ) -> component_future::Poll, String> { //! let thing = component_future::try_ready!(self.some_future.poll()); //! self.process_thing(thing); //! Ok(component_future::Async::DidWork) //! } //! //! fn poll_other_thing( //! &mut self, //! ) -> component_future::Poll, String> { //! if let Some(other_future) = &mut self.other_future { //! let other_thing = component_future::try_ready!( //! other_future.poll() //! ); //! let processed_thing = self.process_other_thing(other_thing); //! self.other_future.take(); //! Ok(component_future::Async::Ready(Some(processed_thing))) //! } //! else { //! Ok(component_future::Async::NothingToDo) //! } //! } //! } //! //! impl futures::stream::Stream for Server { //! type Item = OutputEvent; //! type Error = String; //! //! fn poll(&mut self) -> futures::Poll, Self::Error> { //! component_future::poll_stream(self, Self::POLL_FNS) //! } //! } //! ``` // XXX this is broken with ale // #![warn(clippy::cargo)] #![warn(clippy::pedantic)] #![warn(clippy::nursery)] #![allow(clippy::multiple_crate_versions)] #![allow(clippy::type_complexity)] const _DUMMY_DEPENDENCY: &str = include_str!("../Cargo.toml"); /// Return type of a component of a future or stream, indicating whether a /// value is ready, or if not, what actions were taken. #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum Async { /// We have a value for the main loop to return immediately. Ready(Item), /// One of our inner futures returned `futures::Async::NotReady`. If all /// of our other components return either `NothingToDo` or `NotReady`, /// then our overall future should return `NotReady` and wait to be polled /// again. NotReady, /// We did some work (moved our internal state closer to being ready to /// return a value), but we aren't ready to return a value yet. We should /// re-run all of the poll functions to see if the state modification made /// any of them also able to make progress. DidWork, /// We didn't poll any inner futures or otherwise change our internal /// state at all, so rerunning is unlikely to make progress. If all /// components return either `NothingToDo` or `NotReady` (and at least one /// returns `NotReady`), then we should just return `NotReady` and wait to /// be polled again. It is an error (panic) for all component poll methods /// to return `NothingToDo`. NothingToDo, } /// Each component poll method should return a value of this type. /// /// * `Ok(Async::Ready(t))` means that the overall future or stream is ready /// to return a value. /// * `Ok(Async::NotReady)` means that a poll method called by one of the /// component futures or streams returned `NotReady`, and so it's safe for /// the overall future or stream to also return `NotReady`. /// * `Ok(Async::DidWork)` means that the overall future made progress by /// updating its internal state, but isn't yet ready to return a value. /// * `Ok(Async::NothingToDo)` means that no work was done at all. /// * `Err(e)` means that the overall future or stream is ready to return an /// error. pub type Poll = Result, Error>; /// A macro for extracting the successful type of a `futures::Poll` and /// turning it into a `component_future::Poll`. /// /// This macro propagates both errors and `NotReady` values by returning /// early. #[macro_export] macro_rules! try_ready { ($e:expr) => { match $e { Ok(futures::Async::Ready(t)) => t, Ok(futures::Async::NotReady) => { return Ok($crate::Async::NotReady) } Err(e) => return Err(From::from(e)), } }; } /// The body of a `futures::future::Future::poll` method. /// /// It will repeatedly call the given component poll functions until none of /// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and /// at least one of them returns `Ok(Async::NotReady)`. /// /// # Panics /// /// Panics if all component poll methods return `Ok(Async::NothingToDo)`. /// /// # Examples /// /// ``` /// # use futures::future::Future; /// # struct Foo; /// # impl Foo { /// # const POLL_FNS: /// # &'static [&'static dyn for<'a> Fn( /// # &'a mut Self, /// # ) -> component_future::Poll<(), ()>] = &[]; /// # } /// impl Future for Foo { /// type Item = (); /// type Error = (); /// /// fn poll(&mut self) -> futures::Poll { /// component_future::poll_future(self, Self::POLL_FNS) /// } /// } /// ``` pub fn poll_future<'a, T, Item, Error>( future: &mut T, poll_fns: &'a [&'a dyn for<'b> Fn(&'b mut T) -> Poll], ) -> futures::Poll where T: futures::future::Future, { loop { let mut not_ready = false; let mut did_work = false; for f in poll_fns { match f(future)? { Async::Ready(e) => return Ok(futures::Async::Ready(e)), Async::NotReady => not_ready = true, Async::NothingToDo => {} Async::DidWork => did_work = true, } } if !did_work { if not_ready { return Ok(futures::Async::NotReady); } else { unreachable!() } } } } /// The body of a `futures::stream::Stream::poll` method. /// /// It will repeatedly call the given component poll functions until none of /// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and /// at least one of them returns `Ok(Async::NotReady)`. /// /// # Panics /// /// Panics if all component poll methods return `Ok(Async::NothingToDo)`. /// /// # Examples /// /// ``` /// # use futures::stream::Stream; /// # struct Foo; /// # impl Foo { /// # const POLL_FNS: /// # &'static [&'static dyn for<'a> Fn( /// # &'a mut Self, /// # ) -> component_future::Poll, ()>] = &[]; /// # } /// impl Stream for Foo { /// type Item = (); /// type Error = (); /// /// fn poll(&mut self) -> futures::Poll, Self::Error> { /// component_future::poll_stream(self, Self::POLL_FNS) /// } /// } /// ``` pub fn poll_stream<'a, T, Item, Error>( stream: &mut T, poll_fns: &'a [&'a dyn for<'b> Fn( &'b mut T, ) -> Poll, Error>], ) -> futures::Poll, Error> where T: futures::stream::Stream, { loop { let mut not_ready = false; let mut did_work = false; for f in poll_fns { match f(stream)? { Async::Ready(e) => return Ok(futures::Async::Ready(e)), Async::NotReady => not_ready = true, Async::NothingToDo => {} Async::DidWork => did_work = true, } } if !did_work { if not_ready { return Ok(futures::Async::NotReady); } else { unreachable!() } } } }