aboutsummaryrefslogblamecommitdiffstats
path: root/src/lib.rs
blob: df145e97b5f9d39aaada06a8d3407a297d82c7f1 (plain) (tree)


























































































                                                                               

                              

                          
                                          

                                  

                                                              

                                                                          
                                            
                      
                                                                

                



                                                                              

             



                                                                              

            





                                                                              


                











                                                                            

                                                        




                                                                             












                                                  





























                                                                             
                                       
                   
                                                                       


























                                                                       





























                                                                              
                                       
                   


                                      























                                                                       

     
//! This crate implements the inner future protocol documented in [the tokio
//! docs](https://tokio.rs/docs/futures/getting_asynchronous/).
//!
//! # Overview
//!
//! If you are implementing a complicated future or stream which contains many
//! inner futures and streams, it can be difficult to keep track of when
//! looping is necessary, when it is safe to return `NotReady`, etc. This
//! provides an interface similar to the existing `poll` interface for futures
//! and streams, but extended to include additional state about how the inner
//! future or stream affected the outer future or stream's state. This allows
//! you to easily split your `poll` implementation into multiple methods, and
//! ensure that they interact properly.
//!
//! # Synopsis
//!
//! ```
//! enum OutputEvent {
//!     // ...
//! }
//!
//! struct Server {
//!     // ...
//! #   some_future: Box<
//! #       dyn futures::future::Future<Item = OutputEvent, Error = String>
//! #           + Send,
//! #   >,
//! #   other_future: Option<
//! #       Box<
//! #           dyn futures::future::Future<Item = OutputEvent, Error = String>
//! #               + Send,
//! #       >,
//! #   >,
//! }
//!
//! impl Server {
//!     fn process_thing(&self, thing: OutputEvent) {
//!         // ...
//!     }
//!
//!     fn process_other_thing(&self, thing: OutputEvent) -> OutputEvent {
//!         // ...
//! #       unimplemented!()
//!     }
//! }
//!
//! impl Server {
//!     const POLL_FNS:
//!         &'static [&'static dyn for<'a> Fn(
//!             &'a mut Self,
//!         )
//!             -> component_future::Poll<
//!             Option<OutputEvent>,
//!             String,
//!         >] = &[&Self::poll_thing, &Self::poll_other_thing];
//!
//!     fn poll_thing(
//!         &mut self,
//!     ) -> component_future::Poll<Option<OutputEvent>, String> {
//!         let thing = component_future::try_ready!(self.some_future.poll());
//!         self.process_thing(thing);
//!         Ok(component_future::Async::DidWork)
//!     }
//!
//!     fn poll_other_thing(
//!         &mut self,
//!     ) -> component_future::Poll<Option<OutputEvent>, String> {
//!         if let Some(other_future) = &mut self.other_future {
//!             let other_thing = component_future::try_ready!(
//!                 other_future.poll()
//!             );
//!             let processed_thing = self.process_other_thing(other_thing);
//!             self.other_future.take();
//!             Ok(component_future::Async::Ready(Some(processed_thing)))
//!         }
//!         else {
//!             Ok(component_future::Async::NothingToDo)
//!         }
//!     }
//! }
//!
//! impl futures::stream::Stream for Server {
//!     type Item = OutputEvent;
//!     type Error = String;
//!
//!     fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
//!         component_future::poll_stream(self, Self::POLL_FNS)
//!     }
//! }
//! ```

// XXX this is broken with ale
// #![warn(clippy::cargo)]
#![warn(clippy::pedantic)]
#![warn(clippy::nursery)]
#![allow(clippy::multiple_crate_versions)]
#![allow(clippy::type_complexity)]

const _DUMMY_DEPENDENCY: &str = include_str!("../Cargo.toml");

/// Return type of a component of a future or stream, indicating whether a
/// value is ready, or if not, what actions were taken.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Async<Item> {
    /// We have a value for the main loop to return immediately.
    Ready(Item),

    /// One of our inner futures returned `futures::Async::NotReady`. If all
    /// of our other components return either `NothingToDo` or `NotReady`,
    /// then our overall future should return `NotReady` and wait to be polled
    /// again.
    NotReady,

    /// We did some work (moved our internal state closer to being ready to
    /// return a value), but we aren't ready to return a value yet. We should
    /// re-run all of the poll functions to see if the state modification made
    /// any of them also able to make progress.
    DidWork,

    /// We didn't poll any inner futures or otherwise change our internal
    /// state at all, so rerunning is unlikely to make progress. If all
    /// components return either `NothingToDo` or `NotReady` (and at least one
    /// returns `NotReady`), then we should just return `NotReady` and wait to
    /// be polled again. It is an error (panic) for all component poll methods
    /// to return `NothingToDo`.
    NothingToDo,
}

/// Each component poll method should return a value of this type.
///
/// * `Ok(Async::Ready(t))` means that the overall future or stream is ready
///   to return a value.
/// * `Ok(Async::NotReady)` means that a poll method called by one of the
///   component futures or streams returned `NotReady`, and so it's safe for
///   the overall future or stream to also return `NotReady`.
/// * `Ok(Async::DidWork)` means that the overall future made progress by
///   updating its internal state, but isn't yet ready to return a value.
/// * `Ok(Async::NothingToDo)` means that no work was done at all.
/// * `Err(e)` means that the overall future or stream is ready to return an
///   error.
pub type Poll<Item, Error> = Result<Async<Item>, Error>;

/// A macro for extracting the successful type of a `futures::Poll<T, E>` and
/// turning it into a `component_future::Poll<T, E>`.
///
/// This macro propagates both errors and `NotReady` values by returning
/// early.
#[macro_export]
macro_rules! try_ready {
    ($e:expr) => {
        match $e {
            Ok(futures::Async::Ready(t)) => t,
            Ok(futures::Async::NotReady) => {
                return Ok($crate::Async::NotReady)
            }
            Err(e) => return Err(From::from(e)),
        }
    };
}

/// The body of a `futures::future::Future::poll` method.
///
/// It will repeatedly call the given component poll functions until none of
/// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and
/// at least one of them returns `Ok(Async::NotReady)`.
///
/// # Panics
///
/// Panics if all component poll methods return `Ok(Async::NothingToDo)`.
///
/// # Examples
///
/// ```
/// # use futures::future::Future;
/// # struct Foo;
/// # impl Foo {
/// #     const POLL_FNS:
/// #         &'static [&'static dyn for<'a> Fn(
/// #             &'a mut Self,
/// #         ) -> component_future::Poll<(), ()>] = &[];
/// # }
/// impl Future for Foo {
///     type Item = ();
///     type Error = ();
///
///     fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
///         component_future::poll_future(self, Self::POLL_FNS)
///     }
/// }
/// ```
pub fn poll_future<'a, T, Item, Error>(
    future: &mut T,
    poll_fns: &'a [&'a dyn for<'b> Fn(&'b mut T) -> Poll<Item, Error>],
) -> futures::Poll<Item, Error>
where
    T: futures::future::Future<Item = Item, Error = Error>,
{
    loop {
        let mut not_ready = false;
        let mut did_work = false;

        for f in poll_fns {
            match f(future)? {
                Async::Ready(e) => return Ok(futures::Async::Ready(e)),
                Async::NotReady => not_ready = true,
                Async::NothingToDo => {}
                Async::DidWork => did_work = true,
            }
        }

        if !did_work {
            if not_ready {
                return Ok(futures::Async::NotReady);
            } else {
                unreachable!()
            }
        }
    }
}

/// The body of a `futures::stream::Stream::poll` method.
///
/// It will repeatedly call the given component poll functions until none of
/// them returns `Ok(Async::Ready(t))`, `Ok(Async::DidWork)`, or `Err(e)` and
/// at least one of them returns `Ok(Async::NotReady)`.
///
/// # Panics
///
/// Panics if all component poll methods return `Ok(Async::NothingToDo)`.
///
/// # Examples
///
/// ```
/// # use futures::stream::Stream;
/// # struct Foo;
/// # impl Foo {
/// #     const POLL_FNS:
/// #         &'static [&'static dyn for<'a> Fn(
/// #             &'a mut Self,
/// #         ) -> component_future::Poll<Option<()>, ()>] = &[];
/// # }
/// impl Stream for Foo {
///     type Item = ();
///     type Error = ();
///
///     fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
///         component_future::poll_stream(self, Self::POLL_FNS)
///     }
/// }
/// ```
pub fn poll_stream<'a, T, Item, Error>(
    stream: &mut T,
    poll_fns: &'a [&'a dyn for<'b> Fn(
        &'b mut T,
    ) -> Poll<Option<Item>, Error>],
) -> futures::Poll<Option<Item>, Error>
where
    T: futures::stream::Stream<Item = Item, Error = Error>,
{
    loop {
        let mut not_ready = false;
        let mut did_work = false;

        for f in poll_fns {
            match f(stream)? {
                Async::Ready(e) => return Ok(futures::Async::Ready(e)),
                Async::NotReady => not_ready = true,
                Async::NothingToDo => {}
                Async::DidWork => did_work = true,
            }
        }

        if !did_work {
            if not_ready {
                return Ok(futures::Async::NotReady);
            } else {
                unreachable!()
            }
        }
    }
}