Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by hibiken/asynq.
🔗 Fully Compatible with Go asynq: This implementation is fully compatible with the Go version of hibiken/asynq, allowing seamless interoperation with Go services.
- ✅ Guaranteed at-least-once execution - Tasks won't be lost
- ⏰ Task scheduling - Support for delayed and scheduled tasks
- 🔄 Automatic retry - Configurable retry policies for failed tasks
- 🛡️ Fault recovery - Automatic task recovery on worker crashes
- 🎯 Priority queues - Support for weighted and strict priority
- ⚡ Low latency - Fast Redis writes with low task enqueue latency
- 🔒 Task deduplication - Support for unique task options
- ⏱️ Timeout control - Per-task timeout and deadline support
- 📦 Task aggregation - Support for batch processing of multiple tasks
- 🔌 Flexible interface - Support for middleware and custom handlers
- ⏸️ Queue pause - Ability to pause/resume specific queues
- 🕒 Periodic tasks - Support for cron-style scheduled tasks
- 🏠 High availability - Support for Redis Cluster
- 🖥️ Web UI - Web-based management interface for queues and tasks
- 🔄 Go compatible - Fully compatible with Go version asynq, can be deployed together
- 🎯 Macro support - Attribute macros for easy handler registration (optional feature)
Add to your Cargo.toml:
[dependencies]
asynq = { version = "0.1", features = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }use asynq::{client::Client, task::Task, redis::RedisConnectionType};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create Redis configuration
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
    // Create client
    let client = Client::new(redis_config).await?;
    // Create task
    let payload = EmailPayload {
        to: "[email protected]".to_string(),
        subject: "Welcome!".to_string(),
        body: "Welcome to our service!".to_string(),
    };
    let task = Task::new_with_json("email:send", &payload)?;
    // Enqueue task
    let task_info = client.enqueue(task).await?;
    println!("Task enqueued with ID: {}", task_info.id);
    Ok(())
}use asynq::{server::Server,server::Handler,task::Task, redis::RedisConnectionType, config::ServerConfig};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}
struct EmailProcessor;
#[async_trait]
impl Handler for EmailProcessor {
    async fn process_task(&self, task: Task) -> asynq::error::Result<()> {
        match task.get_type() {
            "email:send" => {
                let payload: EmailPayload = task.get_payload_with_json()?;
                println!("Sending email to: {}", payload.to);
                // Implement actual email sending logic
                Ok(())
            }
            _ => {
                Err(asynq::error::Error::other("Unknown task type"))
            }
        }
    }
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Redis configuration
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
    // Configure queues
    let mut queues = HashMap::new();
    queues.insert("critical".to_string(), 6);
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 1);
    // Server configuration
    let config = ServerConfig::new()
        .concurrency(4)
        .queues(queues);
    // Create server
    let mut server = Server::new(redis_config, config).await?;
    // Start server
    server.run(EmailProcessor).await?;
    Ok(())
}ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:
use asynq::{serve_mux::ServeMux, task::Task, redis::RedisConnectionType, config::ServerConfig, server::ServerBuilder};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
    // Create ServeMux
    let mut mux = ServeMux::new();
    // Register synchronous handler
    mux.handle_func("email:send", |task: Task| {
        println!("Processing email:send {:?}",task);
        Ok(())
    });
    // Register asynchronous handler
    mux.handle_async_func("image:resize", |task: Task| async move {
        println!("Processing image:resize {:?}",task);
        // Async processing logic
        Ok(())
    });
    mux.handle_func("payment:process", |task: Task| {
        println!("Processing payment {:?}",task);
        Ok(())
    });
    // Configure server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);
    // Create and run server
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;
    // ServeMux implements Handler trait, can be passed directly to server.run()
    server.run(mux).await?;
    Ok(())
}Features:
- 🎯 Automatically route tasks to corresponding handlers based on task type
- ⚡ Support for both synchronous (handle_func) and asynchronous (handle_async_func) handlers
- 🔄 Fully compatible with Go version ServeMux
- 🛡️ Type-safe with compile-time checking
- 📝 Clean API, easy to use
See examples/servemux_example.rs for more examples.
When the macros feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:
use asynq::{
    serve_mux::ServeMux, 
    task::Task, 
    task_handler, 
    task_handler_async,
    register_handlers,
    register_async_handlers,
    redis::RedisConnectionType, 
    config::ServerConfig, 
    server::ServerBuilder
};
use std::collections::HashMap;
// Define handlers with attribute macros
#[task_handler("email:send")]
fn handle_email(task: Task) -> asynq::error::Result<()> {
    println!("Processing email:send");
    Ok(())
}
#[task_handler_async("image:resize")]
async fn handle_image(task: Task) -> asynq::error::Result<()> {
    println!("Processing image:resize");
    Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
    
    // Create ServeMux and register handlers with convenience macros
    let mut mux = ServeMux::new();
    register_handlers!(mux, handle_email);
    register_async_handlers!(mux, handle_image);
    
    // Configure and run server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);
    
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;
    
    server.run(mux).await?;
    Ok(())
}Macro Features:
- 🎯 Declarative syntax: Define handlers with clean attribute syntax
- 📝 Reduced boilerplate: Pattern strings are stored with the function
- 🔧 Convenient registration: Use register_handlers!andregister_async_handlers!macros
- 🌐 Familiar pattern: Similar to actix-web's #[get("/path")]routing macros
See examples/macro_example.rs for a complete example.
use std::time::Duration;
// Execute after 5 minutes delay
client.enqueue_in(task, Duration::from_secs(300)).await?;use std::time::Duration;
// Keep unique within 1 hour
let unique_task = Task::new_with_json("report:daily", &payload)?;
client.enqueue_unique(unique_task, Duration::from_secs(3600)).await?;// Add tasks to group for aggregation
for i in 1..=10 {
    let item_task = Task::new_with_json("batch:process", &serde_json::json!({"item": i}))?;
    client.add_to_group(item_task, "daily_batch").await?;
}let task = Task::new_with_json("image:resize", &payload)?
    .with_queue("image_processing")     // Specify queue
    .with_max_retry(5)                  // Maximum retry attempts
    .with_timeout(Duration::from_secs(300)) // Timeout
    .with_unique_ttl(Duration::from_secs(3600)); // Uniqueness TTLlet mut queues = HashMap::new();
queues.insert("critical".to_string(), 6);  // Highest priority
queues.insert("default".to_string(), 3);   // Medium priority
queues.insert("low".to_string(), 1);       // Low priority
let config = ServerConfig::new()
    .queues(queues)
    .strict_priority(true); // Strict priority modeAsynq uses a modular design with main components:
asynq/
├── src/
│   ├── lib.rs              # Library entry and public API
│   ├── client.rs           # Client implementation
│   ├── server.rs           # Server implementation
│   ├── serve_mux.rs         # ServeMux routing (compatible with Go serve_mux.go)
│   ├── processor.rs        # Processor implementation (compatible with Go processor.go)
│   ├── task.rs             # Task data structures
│   ├── error.rs            # Error handling
│   ├── config.rs           # Configuration management
│   ├── redis.rs            # Redis connection management
│   ├── inspector.rs        # Queue inspector
│   └── broker/             # Storage backend abstraction
│       ├── mod.rs          # Broker trait definition
│       └── redis_broker.rs # Redis implementation
├── proto/
│   └── asynq.proto         # Protocol Buffer definitions
└── examples/
    ├── producer.rs         # Producer example
    ├── consumer.rs         # Consumer example
    ├── servemux_example.rs # ServeMux usage example
    └── processor_example.rs # Processor example
- Client: Responsible for enqueueing tasks
- Server: Responsible for dequeuing and processing tasks
- ServeMux: Task routing multiplexer, routes tasks to different handlers by type (compatible with Go servemux.go)
- Processor: Task processor core, handles concurrency control and task execution (compatible with Go asynq processor.go)
- Aggregator: Task aggregator, aggregates tasks from the same group into batch tasks (compatible with Go asynq aggregator.go)
- Broker: Storage backend abstraction, currently supports Redis
- Task: Task data structure containing type, payload, and options
- Handler: Task handler trait that users need to implement
- Inspector: Queue and task inspection and management tool
The Processor module implements task processing architecture compatible with Go asynq processor.go:
- ✅ Semaphore concurrency control: Uses Tokio Semaphore for precise control of concurrent workers
- ✅ Queue priority: Supports both strict priority and weighted priority modes
- ✅ Task timeout: Supports task-level and global timeout settings
- ✅ Graceful shutdown: Waits for all active workers to complete before shutdown
- ✅ Automatic retry: Failed tasks automatically retry with exponential backoff
- ✅ Task archiving: Tasks automatically archived after reaching max retry count
The GroupAggregator module implements task aggregation functionality compatible with Go asynq aggregator.go:
- ✅ Task grouping: Set group label for tasks using with_group()
- ✅ Batch aggregation: Automatically aggregate tasks from the same group into a single batch task
- ✅ Flexible triggers: Supports three trigger conditions: grace period, max group size, max delay
- ✅ Custom aggregation: Customize aggregation logic via GroupAggregatortrait
- ✅ Functional interface: Quickly create aggregators using GroupAggregatorFunc
Example usage:
use asynq::components::aggregator::GroupAggregatorFunc;
// Define aggregation function
let aggregator = GroupAggregatorFunc::new(|group, tasks| {
    // Merge multiple tasks into a single batch task
    let combined = tasks.iter()
        .map(|t| t.get_payload())
        .collect::<Vec<_>>()
        .join(&b"\n"[..]);
    Task::new("batch:process", &combined)
});
// Set on server
server.set_group_aggregator(aggregator);See GROUP_AGGREGATOR.md for more details.
use asynq::config::ServerConfig;
use std::time::Duration;
let config = ServerConfig::new()
    .concurrency(8)                                          // Number of concurrent workers
    .task_check_interval(Duration::from_secs(1))            // Task check interval
    .delayed_task_check_interval(Duration::from_secs(5))    // Delayed task check interval
    .shutdown_timeout(Duration::from_secs(10))              // Shutdown timeout
    .health_check_interval(Duration::from_secs(15))         // Health check interval
    .group_grace_period(Duration::from_secs(60))?           // Group aggregation grace period
    .group_max_delay(Duration::from_secs(300))              // Group max delay
    .group_max_size(100)                                    // Group max size
    .janitor_interval(Duration::from_secs(8))               // Janitor interval
    .janitor_batch_size(50);                                // Janitor batch sizeuse asynq::redis::RedisConnectionType;
use std::time::Duration;
// Basic configuration
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
let redis_config = RedisConnectionType::cluster(nodes)?;use asynq::base::keys::TaskState;
use asynq::inspector::Inspector;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let inspector = Inspector::new(broker);
    // Get queue statistics
    let stats = inspector.get_queue_stats("default").await?;
    println!("Pending: {}, Active: {}", stats.pending, stats.active);
    // List tasks
    let tasks = inspector.list_tasks("default", TaskState::Pending, 1, 10).await?;
    // Requeue archived task
    inspector.requeue_archived_task("default", "task-id").await?;
    // Pause queue
    inspector.pause_queue("default").await?;
    Ok(())
}- Clone the repository:
git clone https://github.com/emo-crab/asynq.git
cd asynq- Install dependencies:
cargo build- Start Redis:
docker run -d -p 6379:6379 redis:alpine- Run examples:
# Terminal 1: Start consumer
cargo run --example consumer
# Terminal 2: Run producer
cargo run --example producer# Unit tests
cargo test
# Integration tests (requires Redis)
cargo test --features integration-testsWe welcome contributions of all kinds! Please read CONTRIBUTING.md for details.
- Use Rust features and best practices
- Keep the API simple and easy to use
- Provide comprehensive documentation
- Ensure code quality and test coverage
- Follow semantic versioning
This project is licensed under the MIT License OR GPL License.
- Thanks to hibiken/asynq for design inspiration
- Thanks to the Rust community for excellent library support
If you have any questions or suggestions, please:
- Submit an Issue
- Create a Pull Request
- Join our discussions
⭐ If this project helps you, please give us a star!