aboutsummaryrefslogtreecommitdiffstats
path: root/tests/run/mod.rs
blob: d64789c89835992da0c4cb1e12afc056d83d793b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#![allow(dead_code)]

use futures::future::Future as _;

pub fn future<T: Send + 'static, E: Send + 'static>(
    fut: impl futures::future::Future<Item = T, Error = E> + Send + 'static,
) -> Result<T, E> {
    let (wchan, rchan) = std::sync::mpsc::channel();
    run(fut.then(move |res| {
        wchan.send(res).unwrap();
        futures::future::ok(())
    }));
    rchan.iter().next().unwrap()
}

pub fn stream<T: Send + 'static, E: Send + 'static>(
    stream: impl futures::stream::Stream<Item = T, Error = E> + Send + 'static,
) -> Result<Vec<T>, E> {
    let (wchan, rchan) = std::sync::mpsc::channel();
    let wchan_ok = wchan.clone();
    let wchan_err = wchan.clone();
    drop(wchan);
    run(stream
        .for_each(move |i| {
            wchan_ok.send(Ok(i)).unwrap();
            futures::future::ok(())
        })
        .map_err(move |e| {
            wchan_err.send(Err(e)).unwrap();
        }));
    rchan.iter().collect()
}

// replacement for tokio::run which keeps panics on the main thread (so that
// they don't get swallowed up and ignored if they happen on other threads)
fn run(
    fut: impl futures::future::Future<Item = (), Error = ()> + Send + 'static,
) {
    let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
    runtime.block_on(fut).unwrap()
}