Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ thiserror = "1.0"
libc = "0.2"
crossbeam-utils = "0.8"
serde = "1.0"
crossbeam-channel = "0.5"

[dependencies.mlua]
version = "0.6"
Expand Down
6 changes: 5 additions & 1 deletion examples/bench/init.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
local txapi = require('bench')
txapi.start({}) -- will use default
txapi.start({
fibers = 16,
max_batch = 16,
runtime = { type = "cur_thread" },
}) -- will use default
-- txapi.start({buffer = 128 })
-- txapi.start({buffer = 128, runtime = { type = "cur_thread" }})
-- txapi.start({buffer = 128, runtime = { type = "multi_thread" }}) -- default
Expand Down
16 changes: 14 additions & 2 deletions examples/bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@ use tokio::time::Instant;
use xtm_rust::{run_module, Dispatcher, ModuleConfig};

async fn module_main(dispatcher: Dispatcher) {
let iterations = 10_000_000;
tokio::spawn({
let dispatcher = dispatcher.try_clone().unwrap();
async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
println!("task_queue: {:>3}", dispatcher.len());
interval.tick().await;
}
}
});

let iterations = 4_000_000;

let worker_n = 6;
let worker_n = 16;
let iterations_per_worker = iterations / worker_n;
let mut workers = Vec::new();

Expand Down Expand Up @@ -57,6 +68,7 @@ fn bench(lua: &Lua) -> LuaResult<LuaTable> {
"start",
lua.create_function_mut(|lua, (config,): (LuaValue,)| {
let config: ModuleConfig = lua.from_value(config)?;
println!("{:?}", config);

run_module(module_main, config, lua).map_err(LuaError::external)
})?,
Expand Down
12 changes: 7 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use serde::{Deserialize, Serialize};
use tokio::runtime;

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ModuleConfig {
pub buffer: usize,
pub fibers: usize,
pub max_recv_retries: usize,
pub max_batch: usize,
pub coio_timeout: f64,
pub fiber_standby_timeout: f64,
pub runtime: RuntimeConfig,
}

Expand All @@ -16,14 +17,15 @@ impl Default for ModuleConfig {
Self {
buffer: 128,
fibers: 16,
max_recv_retries: 100,
coio_timeout: 1.0,
max_batch: 16,
coio_timeout: 0.1,
fiber_standby_timeout: 1.0,
runtime: RuntimeConfig::default(),
}
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "type")]
pub enum RuntimeConfig {
#[serde(rename(deserialize = "cur_thread"))]
Expand Down
147 changes: 147 additions & 0 deletions src/fiber_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::{collections::LinkedList, rc::Rc, time::Duration};

use crossbeam_channel::{unbounded, TryRecvError};
use mlua::Lua;
use tarantool::fiber;

use crate::{ChannelError, Executor, ModuleConfig, Task};

struct SchedulerArgs<'a> {
lua: &'a Lua,
executor: Executor,
config: ModuleConfig,
}
fn scheduler_f(args: Box<SchedulerArgs>) -> i32 {
let SchedulerArgs {
lua,
executor,
config:
ModuleConfig {
max_batch,
coio_timeout,
fibers,
fiber_standby_timeout,
..
},
} = *args;

let cond = Rc::new(fiber::Cond::new());
let (tx, rx) = unbounded::<Task>();

let mut workers = LinkedList::new();
for _ in 0..fibers {
let mut worker = fiber::Fiber::new("worker", &mut worker_f);
worker.set_joinable(true);
worker.start(WorkerArgs {
cond: cond.clone(),
lua,
rx: rx.clone(),
fiber_standby_timeout,
});
workers.push_back(worker);
}

let result = loop {
let tasks = match executor.pop_many(max_batch, coio_timeout) {
Ok(tasks) => tasks,
Err(ChannelError::RXChannelClosed) => break Ok(()),
Err(err) => break Err(err),
};

for task in tasks {
tx.send(task).unwrap(); // TODO: add error handling
cond.signal();
}
};

// gracefully kill fibers
drop(tx);
cond.broadcast();

for worker in workers {
worker.join();
}

match result {
Ok(_) => 0,
Err(_) => -1,
}
}

struct WorkerArgs<'a> {
cond: Rc<fiber::Cond>,
lua: &'a Lua,
rx: crossbeam_channel::Receiver<Task>,
fiber_standby_timeout: f64,
}
fn worker_f(args: Box<WorkerArgs>) -> i32 {
let WorkerArgs {
cond,
lua,
rx,
fiber_standby_timeout,
} = *args;
let fiber_standby_timeout = Duration::from_secs_f64(fiber_standby_timeout);

let thread_func = lua
.create_function(move |lua, _: ()| {
loop {
match rx.try_recv() {
Ok(task) => match task(lua) {
Ok(()) => (),
Err(ChannelError::TXChannelClosed) => continue,
Err(err) => break Err(mlua::Error::external(err)),
},
Err(TryRecvError::Disconnected) => break Ok(()),
Err(TryRecvError::Empty) => {
let signaled = cond.wait_timeout(fiber_standby_timeout);
// if !signaled {
// // kill fiber
// break Ok(());
// }
}
}
}
})
.unwrap();
let thread = lua.create_thread(thread_func).unwrap();
match thread.resume(()) {
Ok(()) => 0,
Err(_) => -1,
}
}

pub(crate) struct FiberPool<'a> {
lua: &'a Lua,
executor: Executor,
config: ModuleConfig,

scheduler: fiber::Fiber<'a, SchedulerArgs<'a>>,
}

impl<'a> FiberPool<'a> {
pub fn new(lua: &'a Lua, executor: Executor, config: ModuleConfig) -> Self {
let mut scheduler = fiber::Fiber::new("scheduler", &mut scheduler_f);
scheduler.set_joinable(true);
Self {
lua,
executor,
config,
scheduler,
}
}

pub fn run(&mut self) -> std::io::Result<()> {
self.scheduler.start(SchedulerArgs {
lua: self.lua,
executor: self.executor.try_clone()?,
config: self.config.clone(),
});
Ok(())
}

// join will exit when all Dispatchers die
pub fn join(&self) {
self.scheduler.join();
}
}
34 changes: 4 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::future::Future;
use crossbeam_utils::thread;
use mlua::Lua;
use tokio::runtime;

pub use txapi::*;
use tarantool::fiber::Fiber;

mod eventfd;
mod txapi;
mod fiber_pool;

mod config;
pub use config::*;
Expand All @@ -27,31 +26,8 @@ where
{
let (dispatcher, executor) = channel(config.buffer)?;

let executor_loop = &mut |args: Box<(&Lua, Executor)>| {
let (lua, executor) = *args;

let thread_func = lua.create_function(move |lua, _: ()| {
Ok(loop {
match executor.exec(lua, config.max_recv_retries, config.coio_timeout) {
Ok(_) => continue,
Err(ChannelError::TXChannelClosed) => continue,
Err(ChannelError::RXChannelClosed) => break 0,
Err(_err) => break -1,
}
})
}).unwrap();
let thread = lua.create_thread(thread_func).unwrap();
thread.resume(()).unwrap()
};

// UNSAFE: fibers must die inside the current function
let mut fibers = Vec::with_capacity(config.fibers);
for _ in 0..config.fibers {
let mut fiber = Fiber::new("xtm", executor_loop);
fiber.set_joinable(true);
fiber.start((lua, executor.try_clone()?));
fibers.push(fiber);
}
let mut fiber_pool = fiber_pool::FiberPool::new(lua, executor, config.clone());
fiber_pool.run()?;

let result = thread::scope(|scope| {
let module_thread = scope
Expand All @@ -67,9 +43,7 @@ where
})
.unwrap();

for fiber in &fibers {
fiber.join();
}
fiber_pool.join();
module_thread.join().unwrap().unwrap()
})
.unwrap();
Expand Down
32 changes: 23 additions & 9 deletions src/txapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::os::unix::io::{AsRawFd, RawFd};
use std::io;
use mlua::Lua;

type Task = Box<dyn FnOnce(&Lua) -> Result<(), ChannelError> + Send>;
pub type Task = Box<dyn FnOnce(&Lua) -> Result<(), ChannelError> + Send>;
type TaskSender = async_channel::Sender<Task>;
type TaskReceiver = async_channel::Receiver<Task>;

Expand Down Expand Up @@ -86,25 +86,39 @@ impl Executor {
Self { task_rx, eventfd }
}

pub fn exec(&self, lua: &Lua, max_recv_retries: usize, coio_timeout: f64) -> Result<(), ChannelError> {
pub fn exec(&self, lua: &Lua, coio_timeout: f64) -> Result<(), ChannelError> {
loop {
match self.task_rx.try_recv() {
Ok(func) => return func(lua),
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Closed) => return Err(ChannelError::RXChannelClosed),
};

for _ in 0..max_recv_retries {
match self.task_rx.try_recv() {
Ok(func) => return func(lua),
Err(TryRecvError::Empty) => tarantool::fiber::sleep(0.),
Err(TryRecvError::Closed) => return Err(ChannelError::RXChannelClosed),
};
}
let _ = self.eventfd.coio_read(coio_timeout);
}
}

pub fn pop_many(&self, max_tasks: usize, coio_timeout: f64) -> Result<Vec<Task>, ChannelError> {
if self.task_rx.is_empty() {
let _ = self.eventfd.coio_read(coio_timeout);
}

let mut tasks = Vec::with_capacity(max_tasks);
for _ in 0..max_tasks {
match self.task_rx.try_recv() {
Ok(func) => tasks.push(func),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Closed) => return Err(ChannelError::RXChannelClosed),
};

if self.task_rx.len() <= 1 {
break;
}
}

Ok(tasks)
}

pub fn try_clone(&self) -> io::Result<Self> {
Ok(Self {
task_rx: self.task_rx.clone(),
Expand Down