diff --git a/benches/benchmark.rs b/benches/benchmark.rs index 6658244..8fd0876 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -120,7 +120,8 @@ fn call_sum_callback(c: &mut Criterion) { } fn call_async_sum_callback(c: &mut Criterion) { - let lua = Lua::new(); + let options = LuaOptions::new().thread_cache_size(1024); + let lua = Lua::new_with(LuaStdLib::ALL_SAFE, options).unwrap(); let callback = lua .create_async_function(|_, (a, b, c): (i64, i64, i64)| async move { task::yield_now().await; @@ -244,7 +245,8 @@ fn call_async_userdata_method(c: &mut Criterion) { } } - let lua = Lua::new(); + let options = LuaOptions::new().thread_cache_size(1024); + let lua = Lua::new_with(LuaStdLib::ALL_SAFE, options).unwrap(); lua.globals().set("userdata", UserData(10)).unwrap(); c.bench_function("call async [userdata method] 10", |b| { diff --git a/src/function.rs b/src/function.rs index cbb1415..4c7c5f0 100644 --- a/src/function.rs +++ b/src/function.rs @@ -126,8 +126,12 @@ impl<'lua> Function<'lua> { R: FromLuaMulti<'lua> + 'fut, { let lua = self.0.lua; - match lua.create_thread(self.clone()) { - Ok(t) => Box::pin(t.into_async(args)), + match lua.create_recycled_thread(self.clone()) { + Ok(t) => { + let mut t = t.into_async(args); + t.set_recyclable(true); + Box::pin(t) + } Err(e) => Box::pin(future::err(e)), } } diff --git a/src/lua.rs b/src/lua.rs index 2234898..a259a8d 100644 --- a/src/lua.rs +++ b/src/lua.rs @@ -92,6 +92,9 @@ struct ExtraData { wrapped_failures_cache: Vec, // Cache of recycled `MultiValue` containers multivalue_cache: Vec>, + // Cache of recycled `Thread`s (coroutines) + #[cfg(feature = "async")] + recycled_thread_cache: Vec, // Index of `Option` userdata on the ref thread #[cfg(feature = "async")] @@ -139,29 +142,51 @@ pub struct LuaOptions { /// [`pcall`]: https://www.lua.org/manual/5.3/manual.html#pdf-pcall /// [`xpcall`]: https://www.lua.org/manual/5.3/manual.html#pdf-xpcall pub catch_rust_panics: bool, + + /// Max size of thread (coroutine) object cache used to execute asynchronous functions. + /// + /// It works only on Lua 5.4 or LuaJIT (vendored) with [`lua_resetthread`] function, + /// and allows to reuse old coroutines with reset state. + /// + /// Default: **0** (disabled) + /// + /// [`lua_resetthread`]: https://www.lua.org/manual/5.4/manual.html#lua_resetthread + #[cfg(feature = "async")] + pub thread_cache_size: usize, } impl Default for LuaOptions { fn default() -> Self { - LuaOptions { - catch_rust_panics: true, - } + LuaOptions::new() } } impl LuaOptions { /// Returns a new instance of `LuaOptions` with default parameters. - pub fn new() -> Self { - Self::default() + pub const fn new() -> Self { + LuaOptions { + catch_rust_panics: true, + #[cfg(feature = "async")] + thread_cache_size: 0, + } } /// Sets [`catch_rust_panics`] option. /// /// [`catch_rust_panics`]: #structfield.catch_rust_panics - pub fn catch_rust_panics(mut self, enabled: bool) -> Self { + pub const fn catch_rust_panics(mut self, enabled: bool) -> Self { self.catch_rust_panics = enabled; self } + + /// Sets [`thread_cache_size`] option. + /// + /// [`thread_cache_size`]: #structfield.thread_cache_size + #[cfg(feature = "async")] + pub const fn thread_cache_size(mut self, size: usize) -> Self { + self.thread_cache_size = size; + self + } } #[cfg(feature = "async")] @@ -181,7 +206,10 @@ impl Drop for Lua { unsafe { if !self.ephemeral { let extra = &mut *self.extra.get(); - for index in extra.wrapped_failures_cache.drain(..) { + let drain_iter = extra.wrapped_failures_cache.drain(..); + #[cfg(feature = "async")] + let drain_iter = drain_iter.chain(extra.recycled_thread_cache.drain(..)); + for index in drain_iter { ffi::lua_pushnil(extra.ref_thread); ffi::lua_replace(extra.ref_thread, index); extra.ref_free.push(index); @@ -402,6 +430,11 @@ impl Lua { ) } + #[cfg(feature = "async")] + if options.thread_cache_size > 0 { + extra.recycled_thread_cache = Vec::with_capacity(options.thread_cache_size); + } + lua } @@ -486,6 +519,8 @@ impl Lua { wrapped_failures_cache: Vec::with_capacity(WRAPPED_FAILURES_CACHE_SIZE), multivalue_cache: Vec::with_capacity(MULTIVALUE_CACHE_SIZE), #[cfg(feature = "async")] + recycled_thread_cache: Vec::new(), + #[cfg(feature = "async")] ref_waker_idx, hook_callback: None, })); @@ -1266,6 +1301,44 @@ impl Lua { } } + /// Wraps a Lua function into a new or recycled thread (coroutine). + #[cfg(feature = "async")] + pub(crate) fn create_recycled_thread<'lua>( + &'lua self, + func: Function<'lua>, + ) -> Result> { + #[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored")))] + unsafe { + let _sg = StackGuard::new(self.state); + check_stack(self.state, 1)?; + + let extra = &mut *self.extra.get(); + if let Some(index) = extra.recycled_thread_cache.pop() { + let thread_state = ffi::lua_tothread(extra.ref_thread, index); + self.push_ref(&func.0); + ffi::lua_xmove(self.state, thread_state, 1); + return Ok(Thread(LuaRef { lua: self, index })); + } + }; + self.create_thread(func) + } + + /// Resets thread (coroutine) and returns to the cache for later use. + #[cfg(feature = "async")] + #[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored")))] + pub(crate) fn recycle_thread<'lua>(&'lua self, thread: &mut Thread<'lua>) { + unsafe { + let extra = &mut *self.extra.get(); + let thread_state = ffi::lua_tothread(extra.ref_thread, thread.0.index); + if extra.recycled_thread_cache.len() < extra.recycled_thread_cache.capacity() + && ffi::lua_resetthread(self.state, thread_state) == ffi::LUA_OK + { + extra.recycled_thread_cache.push(thread.0.index); + thread.0.index = 0; + } + } + } + /// Create a Lua userdata object from a custom userdata type. pub fn create_userdata(&self, data: T) -> Result where diff --git a/src/thread.rs b/src/thread.rs index b6d5f47..19db1f8 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -5,7 +5,7 @@ use crate::error::{Error, Result}; use crate::ffi; use crate::types::LuaRef; use crate::util::{check_stack, error_traceback, pop_error, StackGuard}; -use crate::value::{FromLuaMulti, MultiValue, ToLuaMulti}; +use crate::value::{FromLuaMulti, ToLuaMulti}; #[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored"), doc))] use crate::function::Function; @@ -14,7 +14,7 @@ use crate::function::Function; use { crate::{ lua::{Lua, ASYNC_POLL_PENDING}, - value::Value, + value::{MultiValue, Value}, }, futures_core::{future::Future, stream::Stream}, std::{ @@ -58,6 +58,7 @@ pub struct AsyncThread<'lua, R> { thread: Thread<'lua>, args0: RefCell>>>, ret: PhantomData, + recycle: bool, } impl<'lua> Thread<'lua> { @@ -260,6 +261,7 @@ impl<'lua> Thread<'lua> { thread: self, args0: RefCell::new(Some(args)), ret: PhantomData, + recycle: false, } } } @@ -270,6 +272,24 @@ impl<'lua> PartialEq for Thread<'lua> { } } +#[cfg(feature = "async")] +impl<'lua, R> AsyncThread<'lua, R> { + #[inline] + pub(crate) fn set_recyclable(&mut self, recyclable: bool) { + self.recycle = recyclable; + } +} + +#[cfg(feature = "async")] +#[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored")))] +impl<'lua, R> Drop for AsyncThread<'lua, R> { + fn drop(&mut self) { + if self.recycle { + self.thread.0.lua.recycle_thread(&mut self.thread); + } + } +} + #[cfg(feature = "async")] impl<'lua, R> Stream for AsyncThread<'lua, R> where diff --git a/src/types.rs b/src/types.rs index 799fda4..2cb5af0 100644 --- a/src/types.rs +++ b/src/types.rs @@ -147,7 +147,9 @@ impl<'lua> Clone for LuaRef<'lua> { impl<'lua> Drop for LuaRef<'lua> { fn drop(&mut self) { - self.lua.drop_ref(self) + if self.index > 0 { + self.lua.drop_ref(self); + } } } diff --git a/tests/async.rs b/tests/async.rs index 21762de..096dd22 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -12,8 +12,8 @@ use futures_timer::Delay; use futures_util::stream::TryStreamExt; use mlua::{ - Error, Function, Lua, MetaMethod, Result, Table, TableExt, Thread, UserData, UserDataMethods, - Value, + Error, Function, Lua, LuaOptions, MetaMethod, Result, StdLib, Table, TableExt, Thread, + UserData, UserDataMethods, Value, }; #[tokio::test] @@ -228,7 +228,8 @@ async fn test_async_thread() -> Result<()> { #[tokio::test] async fn test_async_table() -> Result<()> { - let lua = Lua::new(); + let options = LuaOptions::new().thread_cache_size(4); + let lua = Lua::new_with(StdLib::ALL_SAFE, options)?; let table = lua.create_table()?; table.set("val", 10)?;