1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
use futures::future::Future as _;
use snafu::futures01::FutureExt as _;
use snafu::ResultExt as _;
use std::convert::{TryFrom as _, TryInto as _};
#[derive(Debug, snafu::Snafu)]
pub enum Error {
#[snafu(display("failed to read packet: {}", source))]
ReadAsync { source: tokio::io::Error },
#[snafu(display("failed to write packet: {}", source))]
Write { source: std::io::Error },
#[snafu(display("invalid StartCasting message: {}", source))]
ParseStartCastingMessage { source: std::string::FromUtf8Error },
#[snafu(display("invalid message type: {}", ty))]
InvalidMessageType { ty: u32 },
}
pub type Result<T> = std::result::Result<T, Error>;
pub enum Message {
StartCasting { username: String },
}
impl Message {
pub fn start_casting(username: &str) -> Message {
Message::StartCasting {
username: username.to_string(),
}
}
pub fn read_async<R: tokio::io::AsyncRead>(
r: R,
) -> impl futures::future::Future<Item = Self, Error = Error> {
Packet::read_async(r).and_then(Self::try_from)
}
pub fn write<W: std::io::Write>(&self, w: W) -> Result<()> {
Packet::from(self).write(w)
}
}
struct Packet {
ty: u32,
data: Vec<u8>,
}
impl Packet {
fn read_async<R: tokio::io::AsyncRead>(
r: R,
) -> impl futures::future::Future<Item = Self, Error = Error> {
let header_buf = [0u8; std::mem::size_of::<u32>() * 2];
tokio::io::read_exact(r, header_buf)
.and_then(|(r, buf)| {
let (len_buf, ty_buf) =
buf.split_at(std::mem::size_of::<u32>());
let len = u32::from_le_bytes(len_buf.try_into().unwrap());
let ty = u32::from_le_bytes(ty_buf.try_into().unwrap());
futures::future::ok((r, len, ty))
})
.and_then(|(r, len, ty)| {
let body_buf = vec![0u8; len as usize];
tokio::io::read_exact(r, body_buf)
.map(move |(_, buf)| (ty, buf))
})
.and_then(|(ty, buf)| {
futures::future::ok(Packet {
ty,
data: buf.to_vec(),
})
})
.context(ReadAsync)
}
fn write<W: std::io::Write>(&self, mut w: W) -> Result<()> {
Ok(w.write_all(&self.as_bytes()).context(Write)?)
}
fn as_bytes(&self) -> Vec<u8> {
let len = (self.data.len() as u32).to_le_bytes();
let ty = self.ty.to_le_bytes();
len.iter()
.chain(ty.iter())
.chain(self.data.iter())
.cloned()
.collect()
}
}
impl From<&Message> for Packet {
fn from(msg: &Message) -> Self {
match msg {
Message::StartCasting { username } => Packet {
ty: 0,
data: username.as_bytes().to_vec(),
},
}
}
}
impl std::convert::TryFrom<Packet> for Message {
type Error = Error;
fn try_from(packet: Packet) -> Result<Self> {
match packet.ty {
0 => Ok(Message::StartCasting {
username: std::string::String::from_utf8(packet.data)
.context(ParseStartCastingMessage)?,
}),
_ => Err(Error::InvalidMessageType { ty: packet.ty }),
}
}
}
|