aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cmd/cast.rs15
-rw-r--r--src/cmd/server.rs15
-rw-r--r--src/main.rs2
-rw-r--r--src/pb.rs1
-rw-r--r--src/protocol.rs115
5 files changed, 140 insertions, 8 deletions
diff --git a/src/cmd/cast.rs b/src/cmd/cast.rs
index bea3655..042ebd8 100644
--- a/src/cmd/cast.rs
+++ b/src/cmd/cast.rs
@@ -1,7 +1,13 @@
use snafu::ResultExt as _;
#[derive(Debug, snafu::Snafu)]
-pub enum Error {}
+pub enum Error {
+ #[snafu(display("failed to connect: {}", source))]
+ Connect { source: std::io::Error },
+
+ #[snafu(display("failed to write message: {}", source))]
+ Write { source: crate::protocol::Error },
+}
pub type Result<T> = std::result::Result<T, Error>;
@@ -14,5 +20,10 @@ pub fn run<'a>(_matches: &clap::ArgMatches<'a>) -> super::Result<()> {
}
fn run_impl() -> Result<()> {
- unimplemented!()
+ let sock =
+ std::net::TcpStream::connect("127.0.0.1:8000").context(Connect)?;
+ let msg = crate::protocol::Message::start_casting("doy");
+ msg.write(sock).context(Write)?;
+ println!("wrote message successfully");
+ Ok(())
}
diff --git a/src/cmd/server.rs b/src/cmd/server.rs
index 9433e83..f994ee7 100644
--- a/src/cmd/server.rs
+++ b/src/cmd/server.rs
@@ -25,12 +25,19 @@ fn run_impl() -> Result<()> {
let listener = tokio::net::TcpListener::bind(&addr).context(Bind)?;
let server = listener
.incoming()
- .for_each(|_sock| {
- println!("got a connection");
- Ok(())
- })
.map_err(|e| {
eprintln!("accept failed: {}", e);
+ })
+ .for_each(|sock| {
+ crate::protocol::Message::read_async(sock)
+ .map(|msg| match msg {
+ crate::protocol::Message::StartCasting { username } => {
+ println!("got a connection from {}", username);
+ }
+ })
+ .map_err(|e| {
+ eprintln!("failed to read message: {}", e);
+ })
});
tokio::run(server);
Ok(())
diff --git a/src/main.rs b/src/main.rs
index b2fb129..6a15d83 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,5 @@
mod cmd;
-mod pb;
+mod protocol;
mod util;
fn main() {
diff --git a/src/pb.rs b/src/pb.rs
deleted file mode 100644
index 7503dfc..0000000
--- a/src/pb.rs
+++ /dev/null
@@ -1 +0,0 @@
-pub mod termcast;
diff --git a/src/protocol.rs b/src/protocol.rs
new file mode 100644
index 0000000..bcddc6a
--- /dev/null
+++ b/src/protocol.rs
@@ -0,0 +1,115 @@
+use futures::future::Future as _;
+use snafu::futures01::FutureExt as _;
+use snafu::ResultExt as _;
+use std::convert::{TryFrom as _, TryInto as _};
+
+#[derive(Debug, snafu::Snafu)]
+pub enum Error {
+ #[snafu(display("failed to read packet: {}", source))]
+ ReadAsync { source: tokio::io::Error },
+
+ #[snafu(display("failed to write packet: {}", source))]
+ Write { source: std::io::Error },
+
+ #[snafu(display("invalid StartCasting message: {}", source))]
+ ParseStartCastingMessage { source: std::string::FromUtf8Error },
+
+ #[snafu(display("invalid message type: {}", ty))]
+ InvalidMessageType { ty: u32 },
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+pub enum Message {
+ StartCasting { username: String },
+}
+
+impl Message {
+ pub fn start_casting(username: &str) -> Message {
+ Message::StartCasting {
+ username: username.to_string(),
+ }
+ }
+
+ pub fn read_async<R: tokio::io::AsyncRead>(
+ r: R,
+ ) -> impl futures::future::Future<Item = Self, Error = Error> {
+ Packet::read_async(r).and_then(Self::try_from)
+ }
+
+ pub fn write<W: std::io::Write>(&self, w: W) -> Result<()> {
+ Packet::from(self).write(w)
+ }
+}
+
+struct Packet {
+ ty: u32,
+ data: Vec<u8>,
+}
+
+impl Packet {
+ fn read_async<R: tokio::io::AsyncRead>(
+ r: R,
+ ) -> impl futures::future::Future<Item = Self, Error = Error> {
+ let header_buf = [0u8; std::mem::size_of::<u32>() * 2];
+ tokio::io::read_exact(r, header_buf)
+ .and_then(|(r, buf)| {
+ let (len_buf, ty_buf) =
+ buf.split_at(std::mem::size_of::<u32>());
+ let len = u32::from_le_bytes(len_buf.try_into().unwrap());
+ let ty = u32::from_le_bytes(ty_buf.try_into().unwrap());
+ futures::future::ok((r, len, ty))
+ })
+ .and_then(|(r, len, ty)| {
+ let body_buf = vec![0u8; len as usize];
+ tokio::io::read_exact(r, body_buf)
+ .map(move |(_, buf)| (ty, buf))
+ })
+ .and_then(|(ty, buf)| {
+ futures::future::ok(Packet {
+ ty,
+ data: buf.to_vec(),
+ })
+ })
+ .context(ReadAsync)
+ }
+
+ fn write<W: std::io::Write>(&self, mut w: W) -> Result<()> {
+ Ok(w.write_all(&self.as_bytes()).context(Write)?)
+ }
+
+ fn as_bytes(&self) -> Vec<u8> {
+ let len = (self.data.len() as u32).to_le_bytes();
+ let ty = self.ty.to_le_bytes();
+ len.iter()
+ .chain(ty.iter())
+ .chain(self.data.iter())
+ .cloned()
+ .collect()
+ }
+}
+
+impl From<&Message> for Packet {
+ fn from(msg: &Message) -> Self {
+ match msg {
+ Message::StartCasting { username } => Packet {
+ ty: 0,
+ data: username.as_bytes().to_vec(),
+ },
+ }
+ }
+}
+
+impl std::convert::TryFrom<Packet> for Message {
+ type Error = Error;
+
+ fn try_from(packet: Packet) -> Result<Self> {
+ match packet.ty {
+ 0 => Ok(Message::StartCasting {
+ username: std::string::String::from_utf8(packet.data)
+ .context(ParseStartCastingMessage)?,
+ }),
+ _ => Err(Error::InvalidMessageType { ty: packet.ty }),
+ }
+ }
+}