Add thread (coroutine) cache to reset and later reuse to execute async functions.

It works on Lua 5.4 and LuaJIT (vendored) with `lua_resetthread` function.
This commit is contained in:
Alex Orlenko 2021-11-14 12:57:49 +00:00
parent 7efe807199
commit 50f20e0c2c
No known key found for this signature in database
GPG Key ID: 4C150C250863B96D
6 changed files with 119 additions and 17 deletions

View File

@ -120,7 +120,8 @@ fn call_sum_callback(c: &mut Criterion) {
}
fn call_async_sum_callback(c: &mut Criterion) {
let lua = Lua::new();
let options = LuaOptions::new().thread_cache_size(1024);
let lua = Lua::new_with(LuaStdLib::ALL_SAFE, options).unwrap();
let callback = lua
.create_async_function(|_, (a, b, c): (i64, i64, i64)| async move {
task::yield_now().await;
@ -244,7 +245,8 @@ fn call_async_userdata_method(c: &mut Criterion) {
}
}
let lua = Lua::new();
let options = LuaOptions::new().thread_cache_size(1024);
let lua = Lua::new_with(LuaStdLib::ALL_SAFE, options).unwrap();
lua.globals().set("userdata", UserData(10)).unwrap();
c.bench_function("call async [userdata method] 10", |b| {

View File

@ -126,8 +126,12 @@ impl<'lua> Function<'lua> {
R: FromLuaMulti<'lua> + 'fut,
{
let lua = self.0.lua;
match lua.create_thread(self.clone()) {
Ok(t) => Box::pin(t.into_async(args)),
match lua.create_recycled_thread(self.clone()) {
Ok(t) => {
let mut t = t.into_async(args);
t.set_recyclable(true);
Box::pin(t)
}
Err(e) => Box::pin(future::err(e)),
}
}

View File

@ -92,6 +92,9 @@ struct ExtraData {
wrapped_failures_cache: Vec<c_int>,
// Cache of recycled `MultiValue` containers
multivalue_cache: Vec<MultiValue<'static>>,
// Cache of recycled `Thread`s (coroutines)
#[cfg(feature = "async")]
recycled_thread_cache: Vec<c_int>,
// Index of `Option<Waker>` userdata on the ref thread
#[cfg(feature = "async")]
@ -139,29 +142,51 @@ pub struct LuaOptions {
/// [`pcall`]: https://www.lua.org/manual/5.3/manual.html#pdf-pcall
/// [`xpcall`]: https://www.lua.org/manual/5.3/manual.html#pdf-xpcall
pub catch_rust_panics: bool,
/// Max size of thread (coroutine) object cache used to execute asynchronous functions.
///
/// It works only on Lua 5.4 or LuaJIT (vendored) with [`lua_resetthread`] function,
/// and allows to reuse old coroutines with reset state.
///
/// Default: **0** (disabled)
///
/// [`lua_resetthread`]: https://www.lua.org/manual/5.4/manual.html#lua_resetthread
#[cfg(feature = "async")]
pub thread_cache_size: usize,
}
impl Default for LuaOptions {
fn default() -> Self {
LuaOptions {
catch_rust_panics: true,
}
LuaOptions::new()
}
}
impl LuaOptions {
/// Returns a new instance of `LuaOptions` with default parameters.
pub fn new() -> Self {
Self::default()
pub const fn new() -> Self {
LuaOptions {
catch_rust_panics: true,
#[cfg(feature = "async")]
thread_cache_size: 0,
}
}
/// Sets [`catch_rust_panics`] option.
///
/// [`catch_rust_panics`]: #structfield.catch_rust_panics
pub fn catch_rust_panics(mut self, enabled: bool) -> Self {
pub const fn catch_rust_panics(mut self, enabled: bool) -> Self {
self.catch_rust_panics = enabled;
self
}
/// Sets [`thread_cache_size`] option.
///
/// [`thread_cache_size`]: #structfield.thread_cache_size
#[cfg(feature = "async")]
pub const fn thread_cache_size(mut self, size: usize) -> Self {
self.thread_cache_size = size;
self
}
}
#[cfg(feature = "async")]
@ -181,7 +206,10 @@ impl Drop for Lua {
unsafe {
if !self.ephemeral {
let extra = &mut *self.extra.get();
for index in extra.wrapped_failures_cache.drain(..) {
let drain_iter = extra.wrapped_failures_cache.drain(..);
#[cfg(feature = "async")]
let drain_iter = drain_iter.chain(extra.recycled_thread_cache.drain(..));
for index in drain_iter {
ffi::lua_pushnil(extra.ref_thread);
ffi::lua_replace(extra.ref_thread, index);
extra.ref_free.push(index);
@ -402,6 +430,11 @@ impl Lua {
)
}
#[cfg(feature = "async")]
if options.thread_cache_size > 0 {
extra.recycled_thread_cache = Vec::with_capacity(options.thread_cache_size);
}
lua
}
@ -486,6 +519,8 @@ impl Lua {
wrapped_failures_cache: Vec::with_capacity(WRAPPED_FAILURES_CACHE_SIZE),
multivalue_cache: Vec::with_capacity(MULTIVALUE_CACHE_SIZE),
#[cfg(feature = "async")]
recycled_thread_cache: Vec::new(),
#[cfg(feature = "async")]
ref_waker_idx,
hook_callback: None,
}));
@ -1266,6 +1301,44 @@ impl Lua {
}
}
/// Wraps a Lua function into a new or recycled thread (coroutine).
#[cfg(feature = "async")]
pub(crate) fn create_recycled_thread<'lua>(
&'lua self,
func: Function<'lua>,
) -> Result<Thread<'lua>> {
#[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored")))]
unsafe {
let _sg = StackGuard::new(self.state);
check_stack(self.state, 1)?;
let extra = &mut *self.extra.get();
if let Some(index) = extra.recycled_thread_cache.pop() {
let thread_state = ffi::lua_tothread(extra.ref_thread, index);
self.push_ref(&func.0);
ffi::lua_xmove(self.state, thread_state, 1);
return Ok(Thread(LuaRef { lua: self, index }));
}
};
self.create_thread(func)
}
/// Resets thread (coroutine) and returns to the cache for later use.
#[cfg(feature = "async")]
#[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored")))]
pub(crate) fn recycle_thread<'lua>(&'lua self, thread: &mut Thread<'lua>) {
unsafe {
let extra = &mut *self.extra.get();
let thread_state = ffi::lua_tothread(extra.ref_thread, thread.0.index);
if extra.recycled_thread_cache.len() < extra.recycled_thread_cache.capacity()
&& ffi::lua_resetthread(self.state, thread_state) == ffi::LUA_OK
{
extra.recycled_thread_cache.push(thread.0.index);
thread.0.index = 0;
}
}
}
/// Create a Lua userdata object from a custom userdata type.
pub fn create_userdata<T>(&self, data: T) -> Result<AnyUserData>
where

View File

@ -5,7 +5,7 @@ use crate::error::{Error, Result};
use crate::ffi;
use crate::types::LuaRef;
use crate::util::{check_stack, error_traceback, pop_error, StackGuard};
use crate::value::{FromLuaMulti, MultiValue, ToLuaMulti};
use crate::value::{FromLuaMulti, ToLuaMulti};
#[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored"), doc))]
use crate::function::Function;
@ -14,7 +14,7 @@ use crate::function::Function;
use {
crate::{
lua::{Lua, ASYNC_POLL_PENDING},
value::Value,
value::{MultiValue, Value},
},
futures_core::{future::Future, stream::Stream},
std::{
@ -58,6 +58,7 @@ pub struct AsyncThread<'lua, R> {
thread: Thread<'lua>,
args0: RefCell<Option<Result<MultiValue<'lua>>>>,
ret: PhantomData<R>,
recycle: bool,
}
impl<'lua> Thread<'lua> {
@ -260,6 +261,7 @@ impl<'lua> Thread<'lua> {
thread: self,
args0: RefCell::new(Some(args)),
ret: PhantomData,
recycle: false,
}
}
}
@ -270,6 +272,24 @@ impl<'lua> PartialEq for Thread<'lua> {
}
}
#[cfg(feature = "async")]
impl<'lua, R> AsyncThread<'lua, R> {
#[inline]
pub(crate) fn set_recyclable(&mut self, recyclable: bool) {
self.recycle = recyclable;
}
}
#[cfg(feature = "async")]
#[cfg(any(feature = "lua54", all(feature = "luajit", feature = "vendored")))]
impl<'lua, R> Drop for AsyncThread<'lua, R> {
fn drop(&mut self) {
if self.recycle {
self.thread.0.lua.recycle_thread(&mut self.thread);
}
}
}
#[cfg(feature = "async")]
impl<'lua, R> Stream for AsyncThread<'lua, R>
where

View File

@ -147,7 +147,9 @@ impl<'lua> Clone for LuaRef<'lua> {
impl<'lua> Drop for LuaRef<'lua> {
fn drop(&mut self) {
self.lua.drop_ref(self)
if self.index > 0 {
self.lua.drop_ref(self);
}
}
}

View File

@ -12,8 +12,8 @@ use futures_timer::Delay;
use futures_util::stream::TryStreamExt;
use mlua::{
Error, Function, Lua, MetaMethod, Result, Table, TableExt, Thread, UserData, UserDataMethods,
Value,
Error, Function, Lua, LuaOptions, MetaMethod, Result, StdLib, Table, TableExt, Thread,
UserData, UserDataMethods, Value,
};
#[tokio::test]
@ -228,7 +228,8 @@ async fn test_async_thread() -> Result<()> {
#[tokio::test]
async fn test_async_table() -> Result<()> {
let lua = Lua::new();
let options = LuaOptions::new().thread_cache_size(4);
let lua = Lua::new_with(StdLib::ALL_SAFE, options)?;
let table = lua.create_table()?;
table.set("val", 10)?;