aboutsummaryrefslogtreecommitdiffstats
path: root/src/protocol.rs
blob: bcddc6ab9b5b31fa00c16177d63133c20434dc1e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::future::Future as _;
use snafu::futures01::FutureExt as _;
use snafu::ResultExt as _;
use std::convert::{TryFrom as _, TryInto as _};

#[derive(Debug, snafu::Snafu)]
pub enum Error {
    #[snafu(display("failed to read packet: {}", source))]
    ReadAsync { source: tokio::io::Error },

    #[snafu(display("failed to write packet: {}", source))]
    Write { source: std::io::Error },

    #[snafu(display("invalid StartCasting message: {}", source))]
    ParseStartCastingMessage { source: std::string::FromUtf8Error },

    #[snafu(display("invalid message type: {}", ty))]
    InvalidMessageType { ty: u32 },
}

pub type Result<T> = std::result::Result<T, Error>;

pub enum Message {
    StartCasting { username: String },
}

impl Message {
    pub fn start_casting(username: &str) -> Message {
        Message::StartCasting {
            username: username.to_string(),
        }
    }

    pub fn read_async<R: tokio::io::AsyncRead>(
        r: R,
    ) -> impl futures::future::Future<Item = Self, Error = Error> {
        Packet::read_async(r).and_then(Self::try_from)
    }

    pub fn write<W: std::io::Write>(&self, w: W) -> Result<()> {
        Packet::from(self).write(w)
    }
}

struct Packet {
    ty: u32,
    data: Vec<u8>,
}

impl Packet {
    fn read_async<R: tokio::io::AsyncRead>(
        r: R,
    ) -> impl futures::future::Future<Item = Self, Error = Error> {
        let header_buf = [0u8; std::mem::size_of::<u32>() * 2];
        tokio::io::read_exact(r, header_buf)
            .and_then(|(r, buf)| {
                let (len_buf, ty_buf) =
                    buf.split_at(std::mem::size_of::<u32>());
                let len = u32::from_le_bytes(len_buf.try_into().unwrap());
                let ty = u32::from_le_bytes(ty_buf.try_into().unwrap());
                futures::future::ok((r, len, ty))
            })
            .and_then(|(r, len, ty)| {
                let body_buf = vec![0u8; len as usize];
                tokio::io::read_exact(r, body_buf)
                    .map(move |(_, buf)| (ty, buf))
            })
            .and_then(|(ty, buf)| {
                futures::future::ok(Packet {
                    ty,
                    data: buf.to_vec(),
                })
            })
            .context(ReadAsync)
    }

    fn write<W: std::io::Write>(&self, mut w: W) -> Result<()> {
        Ok(w.write_all(&self.as_bytes()).context(Write)?)
    }

    fn as_bytes(&self) -> Vec<u8> {
        let len = (self.data.len() as u32).to_le_bytes();
        let ty = self.ty.to_le_bytes();
        len.iter()
            .chain(ty.iter())
            .chain(self.data.iter())
            .cloned()
            .collect()
    }
}

impl From<&Message> for Packet {
    fn from(msg: &Message) -> Self {
        match msg {
            Message::StartCasting { username } => Packet {
                ty: 0,
                data: username.as_bytes().to_vec(),
            },
        }
    }
}

impl std::convert::TryFrom<Packet> for Message {
    type Error = Error;

    fn try_from(packet: Packet) -> Result<Self> {
        match packet.ty {
            0 => Ok(Message::StartCasting {
                username: std::string::String::from_utf8(packet.data)
                    .context(ParseStartCastingMessage)?,
            }),
            _ => Err(Error::InvalidMessageType { ty: packet.ty }),
        }
    }
}