diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cmd/cast.rs | 15 | ||||
-rw-r--r-- | src/cmd/server.rs | 15 | ||||
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/pb.rs | 1 | ||||
-rw-r--r-- | src/protocol.rs | 115 |
5 files changed, 140 insertions, 8 deletions
diff --git a/src/cmd/cast.rs b/src/cmd/cast.rs index bea3655..042ebd8 100644 --- a/src/cmd/cast.rs +++ b/src/cmd/cast.rs @@ -1,7 +1,13 @@ use snafu::ResultExt as _; #[derive(Debug, snafu::Snafu)] -pub enum Error {} +pub enum Error { + #[snafu(display("failed to connect: {}", source))] + Connect { source: std::io::Error }, + + #[snafu(display("failed to write message: {}", source))] + Write { source: crate::protocol::Error }, +} pub type Result<T> = std::result::Result<T, Error>; @@ -14,5 +20,10 @@ pub fn run<'a>(_matches: &clap::ArgMatches<'a>) -> super::Result<()> { } fn run_impl() -> Result<()> { - unimplemented!() + let sock = + std::net::TcpStream::connect("127.0.0.1:8000").context(Connect)?; + let msg = crate::protocol::Message::start_casting("doy"); + msg.write(sock).context(Write)?; + println!("wrote message successfully"); + Ok(()) } diff --git a/src/cmd/server.rs b/src/cmd/server.rs index 9433e83..f994ee7 100644 --- a/src/cmd/server.rs +++ b/src/cmd/server.rs @@ -25,12 +25,19 @@ fn run_impl() -> Result<()> { let listener = tokio::net::TcpListener::bind(&addr).context(Bind)?; let server = listener .incoming() - .for_each(|_sock| { - println!("got a connection"); - Ok(()) - }) .map_err(|e| { eprintln!("accept failed: {}", e); + }) + .for_each(|sock| { + crate::protocol::Message::read_async(sock) + .map(|msg| match msg { + crate::protocol::Message::StartCasting { username } => { + println!("got a connection from {}", username); + } + }) + .map_err(|e| { + eprintln!("failed to read message: {}", e); + }) }); tokio::run(server); Ok(()) diff --git a/src/main.rs b/src/main.rs index b2fb129..6a15d83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ mod cmd; -mod pb; +mod protocol; mod util; fn main() { diff --git a/src/pb.rs b/src/pb.rs deleted file mode 100644 index 7503dfc..0000000 --- a/src/pb.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod termcast; diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..bcddc6a --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,115 @@ +use futures::future::Future as _; +use snafu::futures01::FutureExt as _; +use snafu::ResultExt as _; +use std::convert::{TryFrom as _, TryInto as _}; + +#[derive(Debug, snafu::Snafu)] +pub enum Error { + #[snafu(display("failed to read packet: {}", source))] + ReadAsync { source: tokio::io::Error }, + + #[snafu(display("failed to write packet: {}", source))] + Write { source: std::io::Error }, + + #[snafu(display("invalid StartCasting message: {}", source))] + ParseStartCastingMessage { source: std::string::FromUtf8Error }, + + #[snafu(display("invalid message type: {}", ty))] + InvalidMessageType { ty: u32 }, +} + +pub type Result<T> = std::result::Result<T, Error>; + +pub enum Message { + StartCasting { username: String }, +} + +impl Message { + pub fn start_casting(username: &str) -> Message { + Message::StartCasting { + username: username.to_string(), + } + } + + pub fn read_async<R: tokio::io::AsyncRead>( + r: R, + ) -> impl futures::future::Future<Item = Self, Error = Error> { + Packet::read_async(r).and_then(Self::try_from) + } + + pub fn write<W: std::io::Write>(&self, w: W) -> Result<()> { + Packet::from(self).write(w) + } +} + +struct Packet { + ty: u32, + data: Vec<u8>, +} + +impl Packet { + fn read_async<R: tokio::io::AsyncRead>( + r: R, + ) -> impl futures::future::Future<Item = Self, Error = Error> { + let header_buf = [0u8; std::mem::size_of::<u32>() * 2]; + tokio::io::read_exact(r, header_buf) + .and_then(|(r, buf)| { + let (len_buf, ty_buf) = + buf.split_at(std::mem::size_of::<u32>()); + let len = u32::from_le_bytes(len_buf.try_into().unwrap()); + let ty = u32::from_le_bytes(ty_buf.try_into().unwrap()); + futures::future::ok((r, len, ty)) + }) + .and_then(|(r, len, ty)| { + let body_buf = vec![0u8; len as usize]; + tokio::io::read_exact(r, body_buf) + .map(move |(_, buf)| (ty, buf)) + }) + .and_then(|(ty, buf)| { + futures::future::ok(Packet { + ty, + data: buf.to_vec(), + }) + }) + .context(ReadAsync) + } + + fn write<W: std::io::Write>(&self, mut w: W) -> Result<()> { + Ok(w.write_all(&self.as_bytes()).context(Write)?) + } + + fn as_bytes(&self) -> Vec<u8> { + let len = (self.data.len() as u32).to_le_bytes(); + let ty = self.ty.to_le_bytes(); + len.iter() + .chain(ty.iter()) + .chain(self.data.iter()) + .cloned() + .collect() + } +} + +impl From<&Message> for Packet { + fn from(msg: &Message) -> Self { + match msg { + Message::StartCasting { username } => Packet { + ty: 0, + data: username.as_bytes().to_vec(), + }, + } + } +} + +impl std::convert::TryFrom<Packet> for Message { + type Error = Error; + + fn try_from(packet: Packet) -> Result<Self> { + match packet.ty { + 0 => Ok(Message::StartCasting { + username: std::string::String::from_utf8(packet.data) + .context(ParseStartCastingMessage)?, + }), + _ => Err(Error::InvalidMessageType { ty: packet.ty }), + } + } +} |