167 lines
5.1 KiB
Rust
167 lines
5.1 KiB
Rust
use crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError};
|
|
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
|
|
use std::{io::ErrorKind, sync::Arc, thread, time};
|
|
|
|
use super::{Connection, SocketConnection};
|
|
use error::{Error, Result};
|
|
use models::Message;
|
|
|
|
type Tx = Sender<Message>;
|
|
type Rx = Receiver<Message>;
|
|
|
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
|
enum ConnectionState {
|
|
Disconnected,
|
|
Connecting,
|
|
Connected,
|
|
Disconnecting,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct Manager {
|
|
state: Arc<RwLock<ConnectionState>>,
|
|
client_id: u64,
|
|
outbound: (Rx, Tx),
|
|
inbound: (Rx, Tx),
|
|
}
|
|
|
|
impl Manager {
|
|
pub fn new(client_id: u64) -> Self {
|
|
let (sender_o, receiver_o) = unbounded();
|
|
let (sender_i, receiver_i) = unbounded();
|
|
|
|
Self {
|
|
state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
|
|
client_id,
|
|
inbound: (receiver_i, sender_i),
|
|
outbound: (receiver_o, sender_o),
|
|
}
|
|
}
|
|
|
|
pub fn client_id(&self) -> u64 {
|
|
self.client_id
|
|
}
|
|
|
|
pub fn send(&self, message: Message) -> Result<()> {
|
|
self.outbound.1.send(message)?;
|
|
Ok(())
|
|
}
|
|
|
|
// TODO: timeout
|
|
pub fn recv(&self) -> Result<Message> {
|
|
while *self.state.read() == ConnectionState::Connected {
|
|
match self.inbound.0.try_recv() {
|
|
Ok(message) => return Ok(message),
|
|
Err(TryRecvError::Empty) => {}
|
|
Err(TryRecvError::Disconnected) => break
|
|
}
|
|
}
|
|
|
|
Err(Error::ConnectionClosed)
|
|
}
|
|
|
|
pub fn connect(&mut self) -> Result<()> {
|
|
// check with a read lock first
|
|
let state = self.state.upgradable_read();
|
|
match *state {
|
|
ConnectionState::Disconnected => {
|
|
// no need to double-check after this because the read lock is upgraded instead of replace with a write lock (no possibility of mutation before the upgrade).
|
|
let mut state = RwLockUpgradableReadGuard::upgrade(state);
|
|
*state = ConnectionState::Connecting;
|
|
|
|
// we are in the connecting state and no longer need a lock to prevent the creation of many threads.
|
|
drop(state);
|
|
|
|
debug!("Connecting");
|
|
|
|
let mut new_connection = SocketConnection::connect()?;
|
|
|
|
debug!("Performing handshake");
|
|
new_connection.handshake(self.client_id)?;
|
|
debug!("Handshake completed");
|
|
|
|
let mut state = self.state.write();
|
|
match *state {
|
|
ConnectionState::Disconnected => panic!("Illegal state: {:?}", *state),
|
|
ConnectionState::Connecting => {
|
|
*state = ConnectionState::Connected;
|
|
|
|
drop(state);
|
|
|
|
debug!("Connected");
|
|
|
|
let state_arc = self.state.clone();
|
|
let inbound = self.inbound.1.clone();
|
|
let outbound = self.outbound.0.clone();
|
|
thread::spawn(move || {
|
|
send_and_receive_loop(state_arc, inbound, outbound, new_connection);
|
|
});
|
|
|
|
Ok(())
|
|
}
|
|
ConnectionState::Connected => panic!("Illegal state: {:?}", *state),
|
|
ConnectionState::Disconnecting => {
|
|
*state = ConnectionState::Disconnected;
|
|
|
|
Err(Error::ConnectionClosed)
|
|
}
|
|
}
|
|
}
|
|
ConnectionState::Connecting => Ok(()),
|
|
ConnectionState::Connected => Ok(()),
|
|
ConnectionState::Disconnecting => Err(Error::Busy),
|
|
}
|
|
}
|
|
|
|
pub fn disconnect(&mut self) {
|
|
let state = self.state.upgradable_read();
|
|
if *state != ConnectionState::Disconnected {
|
|
let mut state = RwLockUpgradableReadGuard::upgrade(state);
|
|
*state = ConnectionState::Disconnecting;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn send_and_receive_loop(
|
|
state: Arc<RwLock<ConnectionState>>,
|
|
mut inbound: Sender<Message>,
|
|
outbound: Receiver<Message>,
|
|
mut conn: SocketConnection,
|
|
) {
|
|
debug!("Starting sender loop");
|
|
|
|
loop {
|
|
match send_and_receive(&mut conn, &mut inbound, &outbound) {
|
|
Err(Error::IoError(ref err)) if err.kind() == ErrorKind::WouldBlock => {}
|
|
Err(Error::IoError(_)) | Err(Error::ConnectionClosed) => {
|
|
let mut state = state.write();
|
|
*state = ConnectionState::Disconnected;
|
|
break;
|
|
}
|
|
Err(e) => error!("send_and_receive error: {}", e),
|
|
_ => {}
|
|
}
|
|
|
|
thread::sleep(time::Duration::from_millis(500));
|
|
}
|
|
}
|
|
|
|
fn send_and_receive(
|
|
connection: &mut SocketConnection,
|
|
inbound: &mut Tx,
|
|
outbound: &Rx,
|
|
) -> Result<()> {
|
|
loop {
|
|
match outbound.try_recv() {
|
|
Ok(msg) => connection.send(msg)?,
|
|
Err(TryRecvError::Empty) => break,
|
|
Err(TryRecvError::Disconnected) => return Err(Error::ConnectionClosed),
|
|
}
|
|
}
|
|
|
|
let msg = connection.recv()?;
|
|
inbound.send(msg)?;
|
|
|
|
Ok(())
|
|
}
|