kafru is a Rust crate designed for efficient task queue management, inspired by the task queue systems found in other programming environments. The name "kafru" is derived from the "Kafra" system in the popular game Ragnarok Online.
This library offers robust background task execution with parallel processing through multithreading, enabling tasks to run concurrently and efficiently. Additionally, kafru features advanced scheduling capabilities, utilizing cron for precise task timing. It integrates seamlessly with SurrealDB for comprehensive management of queues, metrics, and schedules, ensuring reliable and organized task execution.
- Task Registry: Manage and organize a collection of structs designed for task execution.
- Scheduler: Utilize cron-based scheduling to define when tasks are pushed to the queue, with options to set start and end times.
- Queue: Maintain a list of tasks awaiting execution.
- Worker: Execute tasks from the queue, referencing the task registry to determine the appropriate struct for each task.
- SurrealDB Integration: Store and manage scheduling details, queue information, and metrics using SurrealDB.
cargo add kafru
For testing, you can start SurrealDB using an in-memory database:
cargo test -- --nocapture
surreal start -u kafru_admin -p kafru_password -b 0.0.0.0:4030 --allow-all memory
For running SurrealDB in file mode, refer to the SurrealDB documentation.
The following environment variables can be used to configure the database:
Key | Description | Default Value |
---|---|---|
KAFRU_DB_USERNAME |
The database username. | kafru_admin |
KAFRU_DB_PASSWORD |
The database password. | kafru_password |
KAFRU_DB_PORT |
The port number of the database. | 4030 |
KAFRU_DB_HOST |
The database host or IP address. | 127.0.0.1 |
KAFRU_DB_NAMESPACE |
The database namespace, useful for separating production and testing databases. | kafru |
KAFRU_DB_NAME |
The database name. | kafru_db |
The task registry stores a collection of task structs that contain the logic to be executed.
- Your struct must implement the
TaskHandler
trait. - Define the code to execute within the
async fn run(&self, _params: std::collections::HashMap<String, Value>) -> Result<(), String>
method.
use async_trait::async_trait;
use kafru::task::TaskHandler;
use serde_json::Value;
use std::collections::HashMap;
pub struct MySampleStruct;
#[async_trait]
impl TaskHandler for MySampleStruct {
async fn run(&self, _params: HashMap<String, Value>) -> Result<(), String> {
let x = 1; // Replace this with your logic.
let total = x + 1;
// Perform your task logic here.
Ok(())
}
}
- After defining the task struct, register it in the task registry by providing a name for the struct.
use async_trait::async_trait;
use kafru::task::{TaskHandler, TaskRegistry};
use serde_json::Value;
use std::collections::HashMap;
pub struct MySampleStruct;
#[async_trait]
impl TaskHandler for MySampleStruct {
async fn run(&self, _params: HashMap<String, Value>) -> Result<(), String> {
// Task logic here.
Ok(())
}
}
#[tokio::main]
async fn main() {
let mut task_registry: TaskRegistry = TaskRegistry::new().await;
task_registry.register("mytesthandler".to_string(), || Box::new(MySampleStruct)).await;
}
To execute tasks using both the worker (watcher) and the scheduler, follow these steps:
use std::sync::Arc;
use kafru::manager::Manager;
use kafru::task::TaskRegistry;
use std::sync::Arc;
use crate::database::Db;
// Initialize database instance
let db_instance = Db::new(None).await;
// Wrap the database instance in an Arc
let db: Arc<Db> = Arc::new(db_instance.unwrap());
// Declare the server e.g. MYLOCALHOSTABC
let server: String = "MYLOCALHOSTABC".to_string();
// Declare the author name.
let author: String = "Juan dela Cruz".to_string();
// Initialize the Manager struct.
let mut manager = Manager::new(server.clone(),author.clone()).await;
// Initialize the Task Registry struct and register the task.
let mut task_registry: TaskRegistry = TaskRegistry::new().await;
task_registry.register("mytesthandler".to_string(), || {
Box::new(MySampleStruct {
message: "Hello World".to_string(),
})
}).await;
// Share the task_registry using Arc for thread safety.
let task_registry: Arc<TaskRegistry> = Arc::new(task_registry);
// Run the Worker, specifying the queue name, number of threads, the task registry, task poll interval (in seconds), and a database arc clone.
let _ = manager.worker("worker-default".to_string(), 5, task_registry.clone(), 15, Some(db.clone())).await;
// (Optional) Run the Scheduler by specifying the scheduler name, interval (in seconds), and a database arc clone.
let _ = manager.scheduler("scheduler-default".to_string(), 5, Some(db.clone())).await;
// Optionally, wait for both the worker and scheduler to finish. This will prevent the function from exiting prematurely.
let _ = manager.wait().await;
To send command to the scheduler, queue worker, and task.
use crate::agent::Agent;
use std::sync::Arc;
use crate::database::Db;
use crate::Command;
let author: String = "Juan dela Cruz".to_string();
let server: String = "MYLOCALHOSTABC".to_string();
let agent_name = Agent::to_name(server.clone(),"worker-default-0").await;
let agent_data = agent.get_by_name(agent_name,server.clone()).await?;
let agent_id = agent_data.id.unwrap();
let command = Command::QueueGracefulShutdown;
let message: String = "test dela cruz".to_string();
let result = agent.send_command(agent_id, command, Some(author),Some(message)).await;
assert!(result.is_ok(),"{:?}",result.unwrap_err());
For more examples, you can go to: src/test/
directory.
- Task Registration: Registers a task struct (
MySampleStruct
) in the task registry. - Task Registry: Shared using
Arc
to enable safe concurrent access in a multi-threaded environment. - Worker: Runs tasks from the queue, controlled by parameters such as:
queue_name
: Name of the queue to pull tasks from.num_threads
: Number of threads to handle task execution.task_registry
: Registry that contains the task structs.poll_interval
: Polling interval (in seconds) to check for new tasks.
- Scheduler: (Optional) Runs scheduled tasks at specified intervals, similar to cron jobs.
- Join: Ensures that both the worker and scheduler keep running without the program exiting prematurely.
For detailed documentation, visit docs.rs/kafru.
This project is licensed under the Custom License Agreement. See the LICENSE file for details.
Contributions are welcome! Please refer to the CONTRIBUTING.md file for guidelines.
For questions or feedback, you can reach me at [email protected].