Compare commits

...

5 Commits

Author SHA1 Message Date
Michael Pfaff 910871dcf9
Update CHANGELOG.md 2022-02-03 17:01:02 -05:00
Michael Pfaff ad78e64e64
Bump version 2022-02-03 17:00:42 -05:00
Michael Pfaff 30eebae644
Cleanup some stuff 2022-02-03 16:57:40 -05:00
Michael Pfaff d99145133d
Add partial Java bindings 2022-02-03 16:56:26 -05:00
Michael Pfaff 04af242ca7
Overhaul (part 2)
- No more worker thread
- Fully async io thanks to Tokio
- Updated libs
- Client is no longer bound to a specific client id
2022-02-03 15:56:37 -05:00
32 changed files with 812 additions and 479 deletions

View File

@ -1,13 +0,0 @@
module Overcommit::Hook::PrePush
# Runs `cargo test` before push
#
class CargoTest < Base
def run
result = execute(command)
return :pass if result.success?
output = result.stdout + result.stderr
[:fail, output]
end
end
end

5
.gitignore vendored
View File

@ -1,6 +1,5 @@
target/
**/*.rs.bk
Cargo.lock
/target/
/Cargo.lock
# CI
/.rustup

View File

@ -1,99 +0,0 @@
image: "debian:stretch-slim"
stages:
- prepare
- build
- publish
variables:
RUSTUP_HOME: "$CI_PROJECT_DIR/.rustup"
CARGO_HOME: "$CI_PROJECT_DIR/.cargo"
.write_cache:
cache:
key: "$CI_COMMIT_REF_SLUG-rustup"
paths:
- .rustup
- .cargo
.read_cache:
cache:
key: "$CI_COMMIT_REF_SLUG-rustup"
paths:
- .rustup
- .cargo
policy: pull
.toolchain-stable:
extends: .read_cache
before_script:
- apt-get update
- apt-get install -y --no-install-recommends ca-certificates libssl-dev libc6-dev pkg-config gcc
- export PATH="$CARGO_HOME/bin:$PATH"
- rustup default stable
.toolchain-nightly:
extends: .read_cache
allow_failure: true
before_script:
- apt-get update
- apt-get install -y --no-install-recommends ca-certificates libssl-dev libc6-dev pkg-config gcc
- export PATH="$CARGO_HOME/bin:$PATH"
- rustup default nightly
.build-only-when-changes: &build-only-when-changes
only:
changes:
- Cargo.toml
- Cargo.lock
- src/**/*.rs
####### Jobs #######
install-rust:
extends: .write_cache
stage: prepare
script:
- apt-get update
- apt-get install -y --no-install-recommends ca-certificates curl
- curl https://sh.rustup.rs > rustup.sh
- sh rustup.sh -y --default-host x86_64-unknown-linux-gnu
- export PATH="$CARGO_HOME/bin:$PATH"
- rustup install stable
- rustup install nightly
<<: *build-only-when-changes
build-stable-no-default-features:
extends: .toolchain-stable
stage: build
script:
- cargo test --tests --no-default-features
<<: *build-only-when-changes
build-stable-features-rich_presence:
extends: .toolchain-stable
stage: build
script:
- cargo test --tests --no-default-features
--features "rich_presence"
- cargo test --no-default-features
--features "rich_presence"
--example "discord_presence"
--example "discord_presence_subscribe"
<<: *build-only-when-changes
build-nightly:
extends: .toolchain-nightly
stage: build
script:
- cargo test --tests
- cargo test --examples
<<: *build-only-when-changes
deploy-crates-io:
extends: .toolchain-stable
stage: publish
script:
- cargo publish --token $CRATES_IO_API_TOKEN
only:
- tags@valeth/discord-rpc-client.rs

View File

@ -1,13 +0,0 @@
PreCommit:
TrailingWhitespace:
enabled: true
exclude:
- 'target/**/*'
PrePush:
CargoTest:
enabled: true
description: 'Run Cargo tests'
required_executable: 'cargo'
flags: ['test', '--all']
include: "**/*.rs"

View File

@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
## [0.5.1] - 2021-02-03
### Changed
- Add partial Java bindings (missing `disconnect`, `clear_activity`, `send_activity_join_invite`, `close_activity_request`, `subscribe`, `unsubscribe`)
## [0.5.0] - 2021-02-03
### Changed
- Rewrite `Client`, eliminating `ConnectionManager`
- `Client` is now fully async and no worker thread is needed
## [0.4.0] - 2021-02-03
### Changed
- Update libs
- Update to Rust edition 2021
- Connection manager mostly rewritten
- Added support for Discord installed as a flatpak
- Reformat
- Derive `Debug` on more types
- Disconnect actually works now
## [0.3.0] - 2018-12-06
### Changed

View File

@ -5,9 +5,7 @@ Contributions to this project are welcome, just follow these steps.
1. Fork this repository and create a feature branch named after the feature you want to implement
2. Make your changes on your branch
3. Add some test if possibe
4. Make sure all tests pass (I recommend installing [Overcommit][overcommit])
4. Make sure all tests pass
5. Submit a PR/MR on GitHub or GitLab
> **Note**: Make sure you rebase your feature branch on top of master from time to time.
[overcommit]: https://github.com/brigade/overcommit

View File

@ -1,33 +1,47 @@
[package]
authors = ["Patrick Auernig <dev.patrick.auernig@gmail.com>", "Michael Pfaff <michael@pfaff.dev>"]
name = "discord-rpc-client"
name = "discord-rpc-client"
version = "0.5.1"
description = "A Rust client for Discord RPC."
keywords = ["discord", "rpc", "ipc"]
license = "MIT"
readme = "README.md"
repository = "https://gitlab.com/valeth/discord-rpc-client.rs.git"
version = "0.4.0"
[dependencies]
serde = "^1.0"
serde_derive = "^1.0"
serde_json = "^1.0"
byteorder = "^1.0"
log = "~0.4"
bytes = "^0.4"
parking_lot = "^0.7"
crossbeam-channel = "^0.3"
[target.'cfg(windows)'.dependencies]
named_pipe = "0.3.0"
[dependencies.uuid]
version = "^0.6.2"
features = ["v4"]
[dev-dependencies]
simplelog = "~0.5"
authors = [
"Patrick Auernig <dev.patrick.auernig@gmail.com>",
"Michael Pfaff <michael@pfaff.dev>",
]
keywords = ["discord", "rpc", "ipc"]
license = "MIT"
readme = "README.md"
repository = "https://gitlab.com/valeth/discord-rpc-client.rs.git"
edition = "2021"
[features]
default = ["rich_presence"]
rich_presence = []
tokio-parking_lot = ["tokio/parking_lot"]
java-bindings = ["lazy_static", "jni", "tokio/rt-multi-thread"]
[dependencies]
async-trait = "0.1.52"
jni = { version = "0.19", optional = true }
lazy_static = { version = "1.4", optional = true }
tokio = { version = "1.16", features = ["io-util", "net", "sync", "macros", "rt"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
byteorder = "1.0"
log = "0.4"
bytes = "1.1.0"
uuid = { version = "0.8", features = ["v4"] }
[dev-dependencies]
simplelog = "0.11"
tokio = { version = "1.16", features = [
"time",
"rt-multi-thread",
"macros",
"parking_lot",
] }
# this is a workaround to not being able to specify either multiple libs or conditional compilation based on crate-type.
[[example]]
name = "discord_rpc_java"
path = "examples/java.rs"
crate-type = ["cdylib"]
required-features = ["java-bindings"]

View File

@ -1,22 +0,0 @@
version: 1.0.{build}
environment:
matrix:
- TARGET: x86_64-pc-windows-msvc
install:
- appveyor-retry appveyor DownloadFile https://win.rustup.rs/ -FileName rustup-init.exe
- rustup-init.exe -y --default-host x86_64-pc-windows-msvc --default-toolchain stable
- set PATH=%PATH%;C:\Users\appveyor\.cargo\bin
- rustup -V
- cargo -V
clone_depth: 1
skip_tags: true
build: false
test_script:
- cargo test --all
- cargo test --examples

View File

@ -1,16 +1,20 @@
extern crate discord_rpc_client;
extern crate simplelog;
use discord_rpc_client::{models::Activity, Client as DiscordRPC};
use simplelog::*;
use std::io;
fn main() -> discord_rpc_client::Result<()> {
TermLogger::init(LevelFilter::Debug, Config::default()).unwrap();
#[tokio::main]
async fn main() -> discord_rpc_client::Result<()> {
TermLogger::init(
LevelFilter::Debug,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Always,
)
.unwrap();
let mut drpc = DiscordRPC::new(425407036495495169);
let drpc = DiscordRPC::default();
drpc.connect()?;
drpc.connect(425407036495495169).await?;
loop {
let mut buf = String::new();
@ -19,16 +23,19 @@ fn main() -> discord_rpc_client::Result<()> {
buf.pop();
if buf.is_empty() {
if let Err(why) = drpc.clear_activity() {
if let Err(why) = drpc.clear_activity().await {
println!("Failed to clear presence: {}", why);
}
} else {
if let Err(why) = drpc.set_activity(Activity::new().state(buf).assets(|ass| {
ass.large_image("ferris_wat")
.large_text("wat.")
.small_image("rusting")
.small_text("rusting...")
})) {
if let Err(why) = drpc
.set_activity(Activity::new().state(buf).assets(|ass| {
ass.large_image("ferris_wat")
.large_text("wat.")
.small_image("rusting")
.small_text("rusting...")
}))
.await
{
println!("Failed to set presence: {}", why);
}
}

View File

@ -1,30 +1,38 @@
extern crate discord_rpc_client;
extern crate simplelog;
use discord_rpc_client::{models::Event, Client as DiscordRPC};
use simplelog::*;
use std::{thread, time};
fn main() -> discord_rpc_client::Result<()> {
TermLogger::init(LevelFilter::Debug, Config::default()).unwrap();
#[tokio::main]
async fn main() -> discord_rpc_client::Result<()> {
TermLogger::init(
LevelFilter::Debug,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Always,
)
.unwrap();
let mut drpc = DiscordRPC::new(425407036495495169);
let drpc = DiscordRPC::default();
drpc.connect()?;
drpc.connect(425407036495495169).await?;
drpc.subscribe(Event::ActivityJoin, |j| j.secret("123456"))
.await
.expect("Failed to subscribe to event");
drpc.subscribe(Event::ActivitySpectate, |s| s.secret("123456"))
.await
.expect("Failed to subscribe to event");
drpc.subscribe(Event::ActivityJoinRequest, |s| s)
.await
.expect("Failed to subscribe to event");
drpc.unsubscribe(Event::ActivityJoinRequest, |j| j)
.await
.expect("Failed to unsubscribe from event");
loop {
thread::sleep(time::Duration::from_millis(500));
tokio::time::sleep(time::Duration::from_millis(500)).await;
}
}

21
examples/java.rs Normal file
View File

@ -0,0 +1,21 @@
use discord_rpc_client::java::*;
use jni::JNIEnv;
use jni::objects::{JClass, JString, JObject};
// can't just put these in src/java.rs and reexport because of some tree-shaking that the compiler does.
#[no_mangle]
pub extern "system" fn Java_com_discord_rpc_DiscordRPC_create<'a>(env: JNIEnv<'a>, class: JClass) -> JObject<'a> {
Java_com_discord_rpc_DiscordRPC_create0(env, class).unwrap_or(JObject::null())
}
#[no_mangle]
pub extern "system" fn Java_com_discord_rpc_DiscordRPC_connect(env: JNIEnv, obj: JObject, client_id: JString) -> bool {
Java_com_discord_rpc_DiscordRPC_connect0(env, obj, client_id).is_ok()
}
#[no_mangle]
pub extern "system" fn Java_com_discord_rpc_DiscordRPC_setActivity(env: JNIEnv, obj: JObject, j_activity: JObject) -> bool {
Java_com_discord_rpc_DiscordRPC_setActivity0(env, obj, j_activity).is_ok()
}

3
java/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/.bsp/
/target/
/project/

8
java/build.sbt Normal file
View File

@ -0,0 +1,8 @@
ThisBuild / organization := "com.discord"
ThisBuild / version := "0.2.0"
lazy val hello = (project in file("."))
.settings(
name := "discord-rpc"
)

View File

@ -0,0 +1,61 @@
package com.discord.rpc;
public final class Activity {
public String state;
public String details;
public boolean instance;
public ActivityTimestamps timestamps;
public ActivityAssets assets;
public ActivityParty party;
public ActivitySecrets secrets;
public Activity() {
this.state = null;
this.details = null;
this.instance = false;
this.timestamps = null;
this.assets = null;
this.party = null;
this.secrets = null;
}
public Activity withState(String state) {
this.state = state;
return this;
}
public Activity withDetails(String details) {
this.details = details;
return this;
}
public Activity withInstance(boolean instance) {
this.instance = instance;
return this;
}
public Activity withTimestamps(ActivityTimestamps timestamps) {
this.timestamps = timestamps;
return this;
}
public Activity withAssets(ActivityAssets assets) {
this.assets = assets;
return this;
}
public Activity withParty(ActivityParty party) {
this.party = party;
return this;
}
public Activity withSecrets(ActivitySecrets secrets) {
this.secrets = secrets;
return this;
}
public Activity copy() {
return new Activity().withState(this.state).withDetails(this.details).withInstance(this.instance).withTimestamps(this.timestamps).withAssets(this.assets).withParty(this.party).withSecrets(this.secrets);
}
}

View File

@ -0,0 +1,9 @@
package com.discord.rpc;
public record ActivityAssets(
String largeImage,
String largeText,
String smallImage,
String smallText
) {}

View File

@ -0,0 +1,4 @@
package com.discord.rpc;
public record ActivityParty(@Unsigned int id, @Unsigned int minSize, @Unsigned int maxSize) {}

View File

@ -0,0 +1,4 @@
package com.discord.rpc;
public record ActivitySecrets(String join, String spectate, String game) {}

View File

@ -0,0 +1,4 @@
package com.discord.rpc;
public record ActivityTimestamps(@Unsigned Long start, @Unsigned Long end) {}

View File

@ -0,0 +1,30 @@
package com.discord.rpc;
import java.io.File;
public final class DiscordRPC {
private final long handle;
private DiscordRPC(long handle) {
this.handle = handle;
}
/**
* @return the new client instance, or null if an error occurred.
*/
public static native DiscordRPC create();
public native boolean connect(String clientId);
public native boolean setActivity(Activity activity);
static {
final var dir = System.getProperty("com.discord.librarypath");
if (dir != null) {
System.load(dir + File.separator + System.mapLibraryName("discord_rpc"));
} else {
System.loadLibrary("discord_rpc");
}
}
}

View File

@ -0,0 +1,7 @@
package com.discord.rpc;
/**
* Marks an integer that will be re-interpreted natively as unsigned. Use Kotlin's unsigned types with these.
*/
@interface Unsigned {}

View File

@ -1,45 +1,271 @@
use serde::{de::DeserializeOwned, Serialize};
#[allow(unused)]
use serde_json::Value;
use tokio::select;
use tokio::sync::watch::Ref;
use tokio::sync::{watch, Mutex};
use connection::Manager as ConnectionManager;
use error::{Error, Result};
use crate::connection::{Connection, SocketConnection};
use crate::error::{Error, Result};
#[cfg(feature = "rich_presence")]
use models::rich_presence::{
use crate::models::rich_presence::{
Activity, CloseActivityRequestArgs, SendActivityJoinInviteArgs, SetActivityArgs,
};
use models::{
use crate::models::{
commands::{Subscription, SubscriptionArgs},
message::Message,
payload::Payload,
Command, Event, OpCode,
};
#[derive(Clone, Debug)]
#[repr(transparent)]
macro_rules! hollow {
($expr:expr) => {{
let ref_ = $expr.borrow();
ref_.hollow()
}};
}
#[derive(Debug)]
enum ConnectionState<T> {
Disconnected,
Connecting,
Connected(T),
Disconnecting,
}
impl<T: Clone> Clone for ConnectionState<T> {
fn clone(&self) -> Self {
match self {
Self::Disconnected => Self::Disconnected,
Self::Connecting => Self::Connecting,
Self::Connected(arg0) => Self::Connected(arg0.clone()),
Self::Disconnecting => Self::Disconnecting,
}
}
}
impl<T: Copy> Copy for ConnectionState<T> {}
impl<T> ConnectionState<T> {
pub fn is_disconnected(&self) -> bool {
match self {
ConnectionState::Disconnected => true,
_ => false,
}
}
pub fn is_connecting(&self) -> bool {
match self {
ConnectionState::Connecting => true,
_ => false,
}
}
pub fn is_connected(&self) -> bool {
match self {
ConnectionState::Connected(_) => true,
_ => false,
}
}
pub fn is_disconnecting(&self) -> bool {
match self {
ConnectionState::Disconnecting => true,
_ => false,
}
}
}
impl<T> ConnectionState<T> {
pub fn hollow(&self) -> ConnectionState<()> {
match self {
ConnectionState::Disconnected => ConnectionState::Disconnected,
ConnectionState::Connecting => ConnectionState::Connecting,
ConnectionState::Connected(_) => ConnectionState::Connected(()),
ConnectionState::Disconnecting => ConnectionState::Disconnecting,
}
}
}
macro_rules! yield_while {
($receive:expr, $pat:pat) => {{
let mut new_state: _;
loop {
new_state = $receive;
match new_state {
$pat => tokio::task::yield_now().await,
_ => break,
}
}
new_state
}};
}
#[derive(Debug)]
pub struct Client {
connection_manager: ConnectionManager,
state: (
watch::Sender<ConnectionState<(u64, Mutex<SocketConnection>)>>,
watch::Receiver<ConnectionState<(u64, Mutex<SocketConnection>)>>,
),
update: Mutex<()>,
}
impl Default for Client {
fn default() -> Self {
Self {
state: watch::channel(ConnectionState::Disconnected),
update: Mutex::new(()),
}
}
}
impl Client {
pub fn new(client_id: u64) -> Self {
let connection_manager = ConnectionManager::new(client_id);
Self { connection_manager }
/// Returns the client id used by the current connection, or [`None`] if the client is not [`ConnectionState::Connected`].
pub fn client_id(&self) -> Option<u64> {
match *self.state.1.borrow() {
ConnectionState::Connected((client_id, _)) => Some(client_id),
_ => None,
}
}
pub fn client_id(&self) -> u64 {
self.connection_manager.client_id()
async fn connect_and_handshake(client_id: u64) -> Result<SocketConnection> {
debug!("Connecting");
let mut new_connection = SocketConnection::connect().await?;
debug!("Performing handshake");
new_connection.handshake(client_id).await?;
debug!("Handshake completed");
Ok(new_connection)
}
pub fn connect(&mut self) -> Result<()> {
self.connection_manager.connect()
async fn connect0(&self, client_id: u64, conn: Result<SocketConnection>) -> Result<()> {
let _state_guard = self.update.lock().await;
match hollow!(self.state.1) {
state @ ConnectionState::Disconnected => panic!(
"Illegal state during connection process {:?} -> {:?}",
ConnectionState::<()>::Connecting,
state
),
ConnectionState::Connecting => {
self.state
.0
.send(ConnectionState::Connected((client_id, Mutex::new(conn?))))
.expect("the receiver cannot be dropped without the sender!");
debug!("Connected");
Ok(())
}
ConnectionState::Connected(_) => panic!("Illegal concurrent connection!"),
ConnectionState::Disconnecting => {
match conn {
Ok(conn) => match conn.disconnect().await {
Err(e) => {
error!("failed to disconnect properly: {}", e);
}
_ => {}
},
Err(e) => {
error!("failed connection: {}", e);
}
}
self.state
.0
.send(ConnectionState::Disconnected)
.expect("the receiver cannot be dropped without the sender!");
Err(Error::ConnectionClosed)
}
}
}
pub fn disconnect(&mut self) {
self.connection_manager.disconnect()
pub async fn connect(&self, client_id: u64) -> Result<()> {
match hollow!(self.state.1) {
ConnectionState::Connected(_) => Ok(()),
_ => {
let state_guard = self.update.lock().await;
match hollow!(self.state.1) {
ConnectionState::Connected(_) => Ok(()),
ConnectionState::Disconnecting => Err(Error::ConnectionClosed),
ConnectionState::Connecting => {
match yield_while!(hollow!(self.state.1), ConnectionState::Connecting) {
ConnectionState::Connected(_) => Ok(()),
ConnectionState::Disconnecting => Err(Error::ConnectionClosed),
ConnectionState::Disconnected => Err(Error::ConnectionClosed),
ConnectionState::Connecting => unreachable!(),
}
}
ConnectionState::Disconnected => {
self.state
.0
.send(ConnectionState::Connecting)
.expect("the receiver cannot be dropped without the sender!");
drop(state_guard);
select! {
conn = Self::connect_and_handshake(client_id) => {
self.connect0(client_id, conn).await
}
// _ = tokio::task::yield_now() if self.state.1.borrow().is_disconnecting() => {
// self.state.0.send(ConnectionState::Disconnected).expect("the receiver cannot be dropped without the sender!");
// Err(Error::ConnectionClosed)
// }
}
}
}
}
}
}
fn execute<A, E>(&mut self, cmd: Command, args: A, evt: Option<Event>) -> Result<Payload<E>>
/// If currently connected, the function will close the connection.
/// If currently connecting, the function will wait for the connection to be established and will immediately close it.
/// If currently disconnecting, the function will wait for the connection to be closed.
pub async fn disconnect(&self) {
let _state_guard = self.update.lock().await;
match hollow!(self.state.1) {
ConnectionState::Disconnected => {}
ref state @ ConnectionState::Disconnecting => {
match yield_while!(hollow!(self.state.1), ConnectionState::Disconnecting) {
ConnectionState::Disconnected => {}
ConnectionState::Disconnecting => unreachable!(),
new_state => panic!("Illegal state change {:?} -> {:?}", state, new_state),
}
}
ConnectionState::Connecting => {
self.state
.0
.send(ConnectionState::Disconnecting)
.expect("the receiver cannot be dropped without the sender!");
}
state @ ConnectionState::Connected(_) => {
match self.state.0.send_replace(ConnectionState::Disconnecting) {
ConnectionState::Connected(conn) => {
match conn.1.into_inner().disconnect().await {
Err(e) => {
error!("failed to disconnect properly: {}", e);
}
_ => {}
}
}
new_state @ ConnectionState::Connecting => {
panic!("Illegal state change {:?} -> {:?}", state, new_state);
}
state @ ConnectionState::Disconnecting => {
match yield_while!(hollow!(self.state.1), ConnectionState::Disconnecting) {
ConnectionState::Disconnected => {}
ConnectionState::Disconnecting => unreachable!(),
new_state => {
panic!("Illegal state change {:?} -> {:?}", state, new_state)
}
}
}
ConnectionState::Disconnected => {}
}
}
}
}
async fn execute<A, E>(&self, cmd: Command, args: A, evt: Option<Event>) -> Result<Payload<E>>
where
A: Serialize + Send + Sync,
E: Serialize + DeserializeOwned + Send + Sync,
@ -48,58 +274,69 @@ impl Client {
OpCode::Frame,
Payload::with_nonce(cmd, Some(args), None, evt),
);
self.connection_manager.send(message)?;
let Message { payload, .. } = self.connection_manager.recv()?;
let response: Payload<E> = serde_json::from_str(&payload)?;
match response.evt {
Some(Event::Error) => Err(Error::SubscriptionFailed),
_ => Ok(response),
match *self.state.1.borrow() {
ConnectionState::Connected((_, ref conn)) => {
let mut conn = conn.lock().await;
conn.send(message).await?;
let Message { payload, .. } = conn.recv().await?;
let response: Payload<E> = serde_json::from_str(&payload)?;
match response.evt {
Some(Event::Error) => Err(Error::SubscriptionFailed),
_ => Ok(response),
}
}
_ => Err(Error::ConnectionClosed),
}
}
#[cfg(feature = "rich_presence")]
pub fn set_activity(&mut self, activity: Activity) -> Result<Payload<Activity>> {
pub async fn set_activity(&self, activity: Activity) -> Result<Payload<Activity>> {
self.execute(Command::SetActivity, SetActivityArgs::new(activity), None)
.await
}
#[cfg(feature = "rich_presence")]
pub fn clear_activity(&mut self) -> Result<Payload<Activity>> {
pub async fn clear_activity(&self) -> Result<Payload<Activity>> {
self.execute(Command::SetActivity, SetActivityArgs::default(), None)
.await
}
// NOTE: Not sure what the actual response values of
// SEND_ACTIVITY_JOIN_INVITE and CLOSE_ACTIVITY_REQUEST are,
// they are not documented.
#[cfg(feature = "rich_presence")]
pub fn send_activity_join_invite(&mut self, user_id: u64) -> Result<Payload<Value>> {
pub async fn send_activity_join_invite(&self, user_id: u64) -> Result<Payload<Value>> {
self.execute(
Command::SendActivityJoinInvite,
SendActivityJoinInviteArgs::new(user_id),
None,
)
.await
}
#[cfg(feature = "rich_presence")]
pub fn close_activity_request(&mut self, user_id: u64) -> Result<Payload<Value>> {
pub async fn close_activity_request(&self, user_id: u64) -> Result<Payload<Value>> {
self.execute(
Command::CloseActivityRequest,
CloseActivityRequestArgs::new(user_id),
None,
)
.await
}
pub fn subscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
pub async fn subscribe<F>(&self, evt: Event, f: F) -> Result<Payload<Subscription>>
where
F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
{
self.execute(Command::Subscribe, f(SubscriptionArgs::new()), Some(evt))
.await
}
pub fn unsubscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
pub async fn unsubscribe<F>(&self, evt: Event, f: F) -> Result<Payload<Subscription>>
where
F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
{
self.execute(Command::Unsubscribe, f(SubscriptionArgs::new()), Some(evt))
.await
}
}

View File

@ -1,79 +1,66 @@
use std::{
io::{ErrorKind, Read, Write},
marker::Sized,
path::PathBuf,
thread, time,
};
use bytes::BytesMut;
use tokio::io::{AsyncWrite, AsyncRead, AsyncWriteExt, AsyncReadExt};
use error::{Error, Result};
use models::message::{Message, OpCode};
use utils;
use crate::error::{Error, Result};
use crate::models::message::{Message, OpCode};
use crate::utils;
/// Wait for a non-blocking connection until it's complete.
macro_rules! try_until_done {
[ $e:expr ] => {
loop {
match $e {
Ok(_) => break,
Err(Error::IoError(ref err)) if err.kind() == ErrorKind::WouldBlock => (),
Err(why) => return Err(why),
}
thread::sleep(time::Duration::from_millis(500));
}
}
}
pub trait Connection: Sized {
type Socket: Write + Read;
#[async_trait::async_trait]
pub trait Connection: Sized + Send {
type Socket: AsyncWrite + AsyncRead + Unpin + Send;
fn socket(&mut self) -> &mut Self::Socket;
fn ipc_path() -> PathBuf;
fn connect() -> Result<Self>;
async fn connect() -> Result<Self>;
async fn disconnect(self) -> Result<()>;
fn socket_path(n: u8) -> PathBuf {
Self::ipc_path().join(format!("discord-ipc-{}", n))
}
fn handshake(&mut self, client_id: u64) -> Result<()> {
async fn handshake(&mut self, client_id: u64) -> Result<()> {
let hs = json![{
"client_id": client_id.to_string(),
"v": 1,
"nonce": utils::nonce()
}];
try_until_done!(self.send(Message::new(OpCode::Handshake, hs.clone())));
try_until_done!(self.recv());
self.send(Message::new(OpCode::Handshake, hs.clone())).await?;
self.recv().await?;
Ok(())
}
fn ping(&mut self) -> Result<OpCode> {
async fn ping(&mut self) -> Result<OpCode> {
let message = Message::new(OpCode::Ping, json![{}]);
self.send(message)?;
let response = self.recv()?;
self.send(message).await?;
let response = self.recv().await?;
Ok(response.opcode)
}
fn send(&mut self, message: Message) -> Result<()> {
async fn send(&mut self, message: Message) -> Result<()> {
match message.encode() {
Err(why) => error!("{:?}", why),
Ok(bytes) => {
self.socket().write_all(bytes.as_ref())?;
self.socket().write_all(bytes.as_ref()).await?;
}
};
debug!("-> {:?}", message);
Ok(())
}
fn recv(&mut self) -> Result<Message> {
async fn recv(&mut self) -> Result<Message> {
let mut buf = BytesMut::new();
buf.resize(1024, 0);
let n = self.socket().read(&mut buf)?;
let n = self.socket().read(&mut buf).await?;
debug!("Received {} bytes", n);
if n == 0 {

View File

@ -1,166 +0,0 @@
use crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError};
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use std::{io::ErrorKind, sync::Arc, thread, time};
use super::{Connection, SocketConnection};
use error::{Error, Result};
use models::Message;
type Tx = Sender<Message>;
type Rx = Receiver<Message>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ConnectionState {
Disconnected,
Connecting,
Connected,
Disconnecting,
}
#[derive(Clone, Debug)]
pub struct Manager {
state: Arc<RwLock<ConnectionState>>,
client_id: u64,
outbound: (Rx, Tx),
inbound: (Rx, Tx),
}
impl Manager {
pub fn new(client_id: u64) -> Self {
let (sender_o, receiver_o) = unbounded();
let (sender_i, receiver_i) = unbounded();
Self {
state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
client_id,
inbound: (receiver_i, sender_i),
outbound: (receiver_o, sender_o),
}
}
pub fn client_id(&self) -> u64 {
self.client_id
}
pub fn send(&self, message: Message) -> Result<()> {
self.outbound.1.send(message)?;
Ok(())
}
// TODO: timeout
pub fn recv(&self) -> Result<Message> {
while *self.state.read() == ConnectionState::Connected {
match self.inbound.0.try_recv() {
Ok(message) => return Ok(message),
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => break
}
}
Err(Error::ConnectionClosed)
}
pub fn connect(&mut self) -> Result<()> {
// check with a read lock first
let state = self.state.upgradable_read();
match *state {
ConnectionState::Disconnected => {
// no need to double-check after this because the read lock is upgraded instead of replace with a write lock (no possibility of mutation before the upgrade).
let mut state = RwLockUpgradableReadGuard::upgrade(state);
*state = ConnectionState::Connecting;
// we are in the connecting state and no longer need a lock to prevent the creation of many threads.
drop(state);
debug!("Connecting");
let mut new_connection = SocketConnection::connect()?;
debug!("Performing handshake");
new_connection.handshake(self.client_id)?;
debug!("Handshake completed");
let mut state = self.state.write();
match *state {
ConnectionState::Disconnected => panic!("Illegal state: {:?}", *state),
ConnectionState::Connecting => {
*state = ConnectionState::Connected;
drop(state);
debug!("Connected");
let state_arc = self.state.clone();
let inbound = self.inbound.1.clone();
let outbound = self.outbound.0.clone();
thread::spawn(move || {
send_and_receive_loop(state_arc, inbound, outbound, new_connection);
});
Ok(())
}
ConnectionState::Connected => panic!("Illegal state: {:?}", *state),
ConnectionState::Disconnecting => {
*state = ConnectionState::Disconnected;
Err(Error::ConnectionClosed)
}
}
}
ConnectionState::Connecting => Ok(()),
ConnectionState::Connected => Ok(()),
ConnectionState::Disconnecting => Err(Error::Busy),
}
}
pub fn disconnect(&mut self) {
let state = self.state.upgradable_read();
if *state != ConnectionState::Disconnected {
let mut state = RwLockUpgradableReadGuard::upgrade(state);
*state = ConnectionState::Disconnecting;
}
}
}
fn send_and_receive_loop(
state: Arc<RwLock<ConnectionState>>,
mut inbound: Sender<Message>,
outbound: Receiver<Message>,
mut conn: SocketConnection,
) {
debug!("Starting sender loop");
loop {
match send_and_receive(&mut conn, &mut inbound, &outbound) {
Err(Error::IoError(ref err)) if err.kind() == ErrorKind::WouldBlock => {}
Err(Error::IoError(_)) | Err(Error::ConnectionClosed) => {
let mut state = state.write();
*state = ConnectionState::Disconnected;
break;
}
Err(e) => error!("send_and_receive error: {}", e),
_ => {}
}
thread::sleep(time::Duration::from_millis(500));
}
}
fn send_and_receive(
connection: &mut SocketConnection,
inbound: &mut Tx,
outbound: &Rx,
) -> Result<()> {
loop {
match outbound.try_recv() {
Ok(msg) => connection.send(msg)?,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Err(Error::ConnectionClosed),
}
}
let msg = connection.recv()?;
inbound.send(msg)?;
Ok(())
}

View File

@ -1,12 +1,10 @@
mod base;
mod manager;
#[cfg(unix)]
mod unix;
#[cfg(windows)]
mod windows;
pub use self::base::Connection;
pub use self::manager::Manager;
#[cfg(unix)]
pub use self::unix::UnixConnection as SocketConnection;
#[cfg(windows)]

View File

@ -1,23 +1,21 @@
use std::{env, net::Shutdown, os::unix::net::UnixStream, path::PathBuf, time};
use std::{env, path::PathBuf};
use tokio::{io::AsyncWriteExt, net::UnixStream};
use super::base::Connection;
use error::Result;
use crate::error::Result;
#[derive(Debug)]
pub struct UnixConnection {
socket: UnixStream,
}
#[async_trait::async_trait]
impl Connection for UnixConnection {
type Socket = UnixStream;
fn connect() -> Result<Self> {
let connection_name = Self::socket_path(0);
let socket = UnixStream::connect(connection_name)?;
socket.set_nonblocking(true)?;
socket.set_write_timeout(Some(time::Duration::from_secs(30)))?;
socket.set_read_timeout(Some(time::Duration::from_secs(30)))?;
Ok(Self { socket })
fn socket(&mut self) -> &mut Self::Socket {
&mut self.socket
}
fn ipc_path() -> PathBuf {
@ -40,15 +38,14 @@ impl Connection for UnixConnection {
.unwrap_or_else(|| PathBuf::from("/tmp"))
}
fn socket(&mut self) -> &mut Self::Socket {
&mut self.socket
async fn connect() -> Result<Self> {
let connection_name = Self::socket_path(0);
let socket = UnixStream::connect(connection_name).await?;
Ok(Self { socket })
}
}
impl Drop for UnixConnection {
fn drop(&mut self) {
self.socket
.shutdown(Shutdown::Both)
.expect("Failed to properly shut down socket");
async fn disconnect(mut self) -> Result<()> {
self.socket.shutdown().await?;
Ok(())
}
}

View File

@ -1,31 +1,33 @@
use std::{path::PathBuf, time};
use super::base::Connection;
use error::Result;
use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient};
use named_pipe::PipeClient;
use super::base::Connection;
use crate::error::Result;
#[derive(Debug)]
pub struct WindowsConnection {
socket: PipeClient,
socket: NamedPipeClient,
}
#[async_trait::async_trait]
impl Connection for WindowsConnection {
type Socket = PipeClient;
type Socket = NamedPipeClient;
fn connect() -> Result<Self> {
let connection_name = Self::socket_path(0);
let mut socket = PipeClient::connect(connection_name)?;
socket.set_write_timeout(Some(time::Duration::from_secs(30)));
socket.set_read_timeout(Some(time::Duration::from_secs(30)));
Ok(Self { socket })
fn socket(&mut self) -> &mut Self::Socket {
&mut self.socket
}
fn ipc_path() -> PathBuf {
PathBuf::from(r"\\.\pipe\")
}
fn socket(&mut self) -> &mut Self::Socket {
&mut self.socket
async fn connect() -> Result<Self> {
let connection_name = Self::socket_path(0);
Ok(Self { socket: ClientOptions::new().open(connection_name)? })
}
async fn disconnect(mut self) -> Result<()> {
Ok(())
}
}

View File

@ -1,5 +1,5 @@
use crossbeam_channel::SendError;
use serde_json::Error as JsonError;
use tokio::sync::mpsc::error::SendError;
use std::{
error::Error as StdError,
fmt::{self, Display, Formatter},
@ -63,4 +63,4 @@ impl From<SendError<Message>> for Error {
}
}
pub type Result<T> = StdResult<T, Error>;
pub type Result<T, E = Error> = StdResult<T, E>;

233
src/java.rs Normal file
View File

@ -0,0 +1,233 @@
use log::{log, log_enabled, trace, debug, info, warn, error};
use std::sync::{Arc, MutexGuard};
use jni::JNIEnv;
use jni::objects::{JClass, JString, JObject, JValue};
use jni::signature::{JavaType, Primitive};
use jni::sys::jstring;
pub use jni;
use crate as drpc;
const SIG_BOOL: &'static str = "Z";
const SIG_INT: &'static str = "I";
const SIG_LONG: &'static str = "J";
const SIG_NULLABLE_LONG: &'static str = "Ljava/lang/Long;";
const SIG_STRING: &'static str = "Ljava/lang/String;";
const NAME_DISCORD_RPC: &'static str = "com/discord/rpc/DiscordRPC";
lazy_static::lazy_static! {
static ref RUNTIME: Arc<tokio::runtime::Runtime> = Arc::new(tokio::runtime::Runtime::new().expect("unable to create tokio runtime"));
}
fn debug_and_discard_err<T, E: core::fmt::Debug>(result: Result<T, E>) -> Result<T, ()> {
result.map_err(|e| {
error!("{:?}", e);
()
})
}
fn is_null<'b, O>(env: JNIEnv, ref1: O) -> jni::errors::Result<bool> where O: Into<JObject<'b>> {
env.is_same_object(ref1, JObject::null())
}
// taking the [`JNIEnv`] as a reference is needed to satisfy the lifetime checker.
fn get_client<'a>(env: &'a JNIEnv<'a>, obj: JObject<'a>) -> Result<MutexGuard<'a, drpc::Client>, ()> {
if !debug_and_discard_err(env.is_instance_of(obj, NAME_DISCORD_RPC))? {
error!("not an instance of DiscordRPC");
return Err(())
}
let client = env.get_rust_field::<_, _, drpc::Client>(obj, "handle");
debug_and_discard_err(client)
}
#[inline(always)]
pub fn Java_com_discord_rpc_DiscordRPC_create0<'a>(env: JNIEnv<'a>, _class: JClass) -> Result<JObject<'a>, ()> {
let client = drpc::Client::default();
let jobj = debug_and_discard_err(env.alloc_object(NAME_DISCORD_RPC))?;
debug_and_discard_err(env.set_rust_field(jobj, "handle", client))?;
Ok(jobj)
}
#[inline(always)]
pub fn Java_com_discord_rpc_DiscordRPC_connect0(env: JNIEnv, obj: JObject, client_id: JString) -> Result<(), ()> {
let client = get_client(&env, obj)?;
let client_id = debug_and_discard_err(
debug_and_discard_err(
debug_and_discard_err(env.get_string(client_id))?.to_str()
)?.parse::<u64>()
)?;
if let Some(current_client_id) = client.client_id() {
if current_client_id != client_id {
RUNTIME.block_on(async { client.disconnect().await });
}
}
debug_and_discard_err(RUNTIME.block_on(async { client.connect(client_id).await }))?;
Ok(())
}
#[inline(always)]
pub fn Java_com_discord_rpc_DiscordRPC_setActivity0(env: JNIEnv, obj: JObject, j_activity: JObject) -> Result<(), ()> {
let client = get_client(&env, obj)?;
let activity = jobject_to_activity(env, j_activity)?;
debug_and_discard_err(RUNTIME.block_on(async { client.set_activity(activity).await }))?;
Ok(())
}
fn jobject_to_activity(env: JNIEnv, jobject: JObject) -> Result<drpc::models::Activity, ()> {
let j_state = env.get_field(jobject, "state", SIG_STRING).map_err(|_| ())?;
let j_details = env.get_field(jobject, "details", SIG_STRING).map_err(|_| ())?;
let j_instance = env.get_field(jobject, "instance", SIG_BOOL).map_err(|_| ())?;
let j_timestamps = env.get_field(jobject, "timestamps", "Lcom/discord/rpc/ActivityTimestamps;").map_err(|_| ())?;
let j_assets = env.get_field(jobject, "assets", "Lcom/discord/rpc/ActivityAssets;").map_err(|_| ())?;
let j_party = env.get_field(jobject, "party", "Lcom/discord/rpc/ActivityParty;").map_err(|_| ())?;
let j_secrets = env.get_field(jobject, "secrets", "Lcom/discord/rpc/ActivitySecrets;").map_err(|_| ())?;
let mut activity = drpc::models::Activity::new();
if let JValue::Object(obj) = j_state {
if !is_null(env, obj).map_err(|_| ())? {
activity = activity.state(env.get_string(obj.into()).map_err(|_| ())?);
}
}
if let JValue::Object(obj) = j_details {
if !is_null(env, obj).map_err(|_| ())? {
activity = activity.details(env.get_string(obj.into()).map_err(|_| ())?);
}
}
if let JValue::Bool(b) = j_instance {
if b != 0 {
activity = activity.instance(true);
}
}
if let JValue::Object(obj) = j_timestamps {
if !is_null(env, obj).map_err(|_| ())? {
let timestamps = jobject_to_activity_timestamps(env, obj)?;
activity = activity.timestamps(|_|timestamps);
}
}
if let JValue::Object(obj) = j_assets {
if !is_null(env, obj).map_err(|_| ())? {
let assets = jobject_to_activity_assets(env, obj)?;
activity = activity.assets(|_|assets);
}
}
if let JValue::Object(obj) = j_party {
if !is_null(env, obj).map_err(|_| ())? {
let party = jobject_to_activity_party(env, obj)?;
activity = activity.party(|_|party);
}
}
if let JValue::Object(obj) = j_secrets {
if !is_null(env, obj).map_err(|_| ())? {
let secrets = jobject_to_activity_secrets(env, obj)?;
activity = activity.secrets(|_|secrets);
}
}
Ok(activity)
}
fn jobject_to_activity_timestamps(env: JNIEnv, jobject: JObject) -> Result<drpc::models::ActivityTimestamps, ()> {
let j_start = debug_and_discard_err(env.get_field(jobject, "start", SIG_NULLABLE_LONG))?;
let j_end = debug_and_discard_err(env.get_field(jobject, "end", SIG_NULLABLE_LONG))?;
let mut timestamps = drpc::models::ActivityTimestamps::new();
if let JValue::Object(obj) = j_start {
if !is_null(env, obj).map_err(|_| ())? {
if let JValue::Long(l) = debug_and_discard_err(env.call_method_unchecked(obj, (obj, "longValue", "()J"), JavaType::Primitive(Primitive::Long), &[]))? {
timestamps = timestamps.start(l as u64);
}
}
}
if let JValue::Object(obj) = j_end {
if !is_null(env, obj).map_err(|_| ())? {
if let JValue::Long(l) = debug_and_discard_err(env.call_method_unchecked(obj, (obj, "longValue", "()J"), JavaType::Primitive(Primitive::Long), &[]))? {
timestamps = timestamps.end(l as u64);
}
}
}
Ok(timestamps)
}
fn jobject_to_activity_assets(env: JNIEnv, jobject: JObject) -> Result<drpc::models::ActivityAssets, ()> {
let j_lrg_img = env.get_field(jobject, "largeImage", SIG_STRING).map_err(|_| ())?;
let j_lrg_txt = env.get_field(jobject, "largeText", SIG_STRING).map_err(|_| ())?;
let j_sml_img = env.get_field(jobject, "smallImage", SIG_STRING).map_err(|_| ())?;
let j_sml_txt = env.get_field(jobject, "smallText", SIG_STRING).map_err(|_| ())?;
let mut assets = drpc::models::ActivityAssets::new();
if let JValue::Object(obj) = j_lrg_img {
if !is_null(env, obj).map_err(|_| ())? {
assets = assets.large_image(env.get_string(obj.into()).map_err(|_| ())?);
}
}
if let JValue::Object(obj) = j_lrg_txt {
if !is_null(env, obj).map_err(|_| ())? {
assets = assets.large_text(env.get_string(obj.into()).map_err(|_| ())?);
}
}
if let JValue::Object(obj) = j_sml_img {
if !is_null(env, obj).map_err(|_| ())? {
assets = assets.small_image(env.get_string(obj.into()).map_err(|_| ())?);
}
}
if let JValue::Object(obj) = j_sml_txt {
if !is_null(env, obj).map_err(|_| ())? {
assets = assets.small_text(env.get_string(obj.into()).map_err(|_| ())?);
}
}
Ok(assets)
}
fn jobject_to_activity_party(env: JNIEnv, jobject: JObject) -> Result<drpc::models::ActivityParty, ()> {
let j_id = env.get_field(jobject, "id", SIG_INT).map_err(|_| ())?;
let j_min_size = env.get_field(jobject, "minSize", SIG_INT).map_err(|_| ())?;
let j_max_size = env.get_field(jobject, "maxSize", SIG_INT).map_err(|_| ())?;
let mut party = drpc::models::ActivityParty::new();
if let JValue::Int(l) = j_id {
party = party.id(l as u32);
}
if let (JValue::Int(l1), JValue::Int(l2)) = (j_min_size, j_max_size) {
party = party.size((l1 as u32, l2 as u32));
}
Ok(party)
}
fn jobject_to_activity_secrets(env: JNIEnv, jobject: JObject) -> Result<drpc::models::ActivitySecrets, ()> {
let j_join = env.get_field(jobject, "join", SIG_STRING).map_err(|_| ())?;
let j_spectate = env.get_field(jobject, "spectate", SIG_STRING).map_err(|_| ())?;
let j_game = env.get_field(jobject, "game", SIG_STRING).map_err(|_| ())?;
let mut secrets = drpc::models::ActivitySecrets::new();
if let JValue::Object(obj) = j_join {
if !is_null(env, obj).map_err(|_| ())? {
secrets = secrets.join(env.get_string(obj.into()).map_err(|_| ())?);
}
}
if let JValue::Object(obj) = j_spectate {
if !is_null(env, obj).map_err(|_| ())? {
secrets = secrets.spectate(env.get_string(obj.into()).map_err(|_| ())?);
}
}
if let JValue::Object(obj) = j_game {
if !is_null(env, obj).map_err(|_| ())? {
secrets = secrets.game(env.get_string(obj.into()).map_err(|_| ())?);
}
}
Ok(secrets)
}

View File

@ -1,17 +1,9 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
extern crate serde;
#[macro_use]
extern crate serde_json;
extern crate byteorder;
extern crate bytes;
extern crate crossbeam_channel;
#[cfg(windows)]
extern crate named_pipe;
extern crate parking_lot;
extern crate uuid;
#[macro_use]
mod macros;
@ -25,3 +17,6 @@ pub use client::Client;
pub use connection::{Connection, SocketConnection};
pub use error::*;
#[cfg(feature = "java-bindings")]
pub mod java;

View File

@ -4,7 +4,7 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use serde::Serialize;
use serde_json;
use error::{Error, Result};
use crate::error::{Error, Result};
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum OpCode {
@ -15,9 +15,10 @@ pub enum OpCode {
Pong,
}
// FIXME: Use TryFrom trait when stable
impl OpCode {
fn try_from(int: u32) -> Result<Self> {
impl TryFrom<u32> for OpCode {
type Error = Error;
fn try_from(int: u32) -> Result<Self, Self::Error> {
match int {
0 => Ok(OpCode::Handshake),
1 => Ok(OpCode::Frame),

View File

@ -4,7 +4,7 @@ use serde::{de::DeserializeOwned, Serialize};
use serde_json;
use super::{Command, Event, Message};
use utils;
use crate::utils;
#[derive(Debug, PartialEq, Deserialize, Serialize)]
pub struct Payload<T>

View File

@ -3,7 +3,7 @@
use std::default::Default;
use super::shared::PartialUser;
use utils;
use crate::utils;
#[derive(Debug, PartialEq, Deserialize, Serialize)]
pub struct SetActivityArgs {