From caaae615f06d53a6eac6f77a4c7f4df73fbb969c Mon Sep 17 00:00:00 2001 From: Patrick Auernig Date: Thu, 6 Dec 2018 00:47:18 +0100 Subject: [PATCH] Move handshake and ping into Connection trait --- Cargo.toml | 1 + src/connection/base.rs | 62 +++++++++++++++++++++++++++++++++++++----- src/error.rs | 2 ++ src/lib.rs | 1 + 4 files changed, 59 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fb75065..5700a2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ serde_derive = "^1.0" serde_json = "^1.0" byteorder = "^1.0" log = "~0.4" +bytes = "^0.4" [target.'cfg(windows)'.dependencies] named_pipe = "0.3.0" diff --git a/src/connection/base.rs b/src/connection/base.rs index 595347e..59641bd 100644 --- a/src/connection/base.rs +++ b/src/connection/base.rs @@ -1,17 +1,37 @@ use std::{ - io::{Write, Read}, + io::{Write, Read, ErrorKind}, marker::Sized, path::PathBuf, + thread, + time, }; -use models::message::Message; -use error::Result; +use bytes::BytesMut; + +use utils; +use models::message::{Message, OpCode}; +use error::{Error, Result}; + + +/// Wait for a non-blocking connection until it's complete. +macro_rules! try_until_done { + [ $e:expr ] => { + loop { + match $e { + Ok(_) => break, + Err(Error::IoError(ref err)) if err.kind() == ErrorKind::WouldBlock => (), + Err(why) => return Err(why), + } + + thread::sleep(time::Duration::from_millis(500)); + } + } +} pub trait Connection: Sized { type Socket: Write + Read; - fn socket(&mut self) -> &mut Self::Socket; fn ipc_path() -> PathBuf; @@ -24,6 +44,26 @@ pub trait Connection: Sized { Self::ipc_path().join(format!("discord-ipc-{}", n)) } + fn handshake(&mut self, client_id: u64) -> Result<()> { + let hs = json![{ + "client_id": client_id.to_string(), + "v": 1, + "nonce": utils::nonce() + }]; + + try_until_done!(self.send(Message::new(OpCode::Handshake, hs.clone()))); + try_until_done!(self.recv()); + + Ok(()) + } + + fn ping(&mut self) -> Result { + let message = Message::new(OpCode::Ping, json![{}]); + self.send(message)?; + let response = self.recv()?; + Ok(response.opcode) + } + fn send(&mut self, message: Message) -> Result<()> { match message.encode() { Err(why) => error!("{:?}", why), @@ -36,11 +76,19 @@ pub trait Connection: Sized { } fn recv(&mut self) -> Result { - let mut buf: Vec = vec![0; 1024]; - let n = self.socket().read(buf.as_mut_slice())?; - buf.resize(n, 0); + let mut buf = BytesMut::new(); + buf.resize(1024, 0); + let n = self.socket().read(&mut buf)?; + debug!("Received {} bytes", n); + + if n == 0 { + return Err(Error::ConnectionClosed); + } + + buf = buf.split_to(n); let message = Message::decode(&buf)?; debug!("<- {:?}", message); + Ok(message) } } diff --git a/src/error.rs b/src/error.rs index 0ccfc93..16a24f7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -19,6 +19,7 @@ pub enum Error { Timeout(ChannelTimeout), Conversion, SubscriptionFailed, + ConnectionClosed, } impl Display for Error { @@ -32,6 +33,7 @@ impl StdError for Error { match *self { Error::Conversion => "Failed to convert values", Error::SubscriptionFailed => "Failed to subscribe to event", + Error::ConnectionClosed => "Connection closed", Error::IoError(ref err) => err.description(), Error::JsonError(ref err) => err.description(), Error::Timeout(ref err) => err.description(), diff --git a/src/lib.rs b/src/lib.rs index cae9421..ae69233 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ extern crate serde; extern crate serde_json; extern crate byteorder; extern crate uuid; +extern crate bytes; #[cfg(windows)] extern crate named_pipe;