aboutsummaryrefslogtreecommitdiffstats
path: root/src/lib.rs
blob: e49a5fb3d0c776b11c38fb6f20ac06a5fc2f9f68 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#![warn(clippy::pedantic)]
#![warn(clippy::nursery)]
#![allow(clippy::type_complexity)]

pub enum Async<Item> {
    // we have a value for the main loop to return immediately.
    Ready(Item),

    // one of our inner futures returned futures::Async::NotReady. if all
    // of our other components return either NothingToDo or NotReady, then our
    // overall future should return NotReady and wait to be polled again.
    NotReady,

    // we did some work (moved our internal state closer to being ready to
    // return a value), but we aren't ready to return a value yet. we should
    // re-run all of the poll functions to see if the state modification made
    // any of them also able to make progress.
    DidWork,

    // we didn't poll any inner futures or otherwise change our internal state
    // at all, so rerunning is unlikely to make progress. if all components
    // return either NothingToDo or NotReady (and at least one returned
    // NotReady), then we should just return NotReady and wait to be polled
    // again. it is an error (panic) for all component poll methods to return
    // NothingToDo.
    NothingToDo,
}

pub type Poll<Item, Error> = Result<Async<Item>, Error>;

#[macro_export]
macro_rules! try_ready {
    ($e:expr) => {
        match $e {
            Ok(futures::Async::Ready(t)) => t,
            Ok(futures::Async::NotReady) => {
                return Ok($crate::Async::NotReady)
            }
            Err(e) => return Err(From::from(e)),
        }
    };
}

pub fn poll_future<T, Item, Error>(
    future: &mut T,
    poll_fns: &'static [&'static dyn for<'a> Fn(
        &'a mut T,
    ) -> Poll<Item, Error>],
) -> futures::Poll<Item, Error>
where
    T: futures::future::Future<Item = Item, Error = Error>,
{
    loop {
        let mut not_ready = false;
        let mut did_work = false;

        for f in poll_fns {
            match f(future)? {
                Async::Ready(e) => return Ok(futures::Async::Ready(e)),
                Async::NotReady => not_ready = true,
                Async::NothingToDo => {}
                Async::DidWork => did_work = true,
            }
        }

        if !did_work {
            if not_ready {
                return Ok(futures::Async::NotReady);
            } else {
                unreachable!()
            }
        }
    }
}

pub fn poll_stream<T, Item, Error>(
    stream: &mut T,
    poll_fns: &'static [&'static dyn for<'a> Fn(
        &'a mut T,
    ) -> Poll<
        Option<Item>,
        Error,
    >],
) -> futures::Poll<Option<Item>, Error>
where
    T: futures::stream::Stream<Item = Item, Error = Error>,
{
    loop {
        let mut not_ready = false;
        let mut did_work = false;

        for f in poll_fns {
            match f(stream)? {
                Async::Ready(e) => return Ok(futures::Async::Ready(e)),
                Async::NotReady => not_ready = true,
                Async::NothingToDo => {}
                Async::DidWork => did_work = true,
            }
        }

        if !did_work {
            if not_ready {
                return Ok(futures::Async::NotReady);
            } else {
                unreachable!()
            }
        }
    }
}