Skip to content

Commit 67aa210

Browse files
authored
feat: add Redis result backend (#7)
* feat: implement Redis-backed result backend for task metadata storage - Added RedisBackend struct to manage task metadata using Redis. - Implemented methods to store, retrieve, and delete task metadata. - Introduced configuration options for key prefix and result TTL. - Added an in-memory backend for testing purposes. - Included unit tests to verify functionality of the Redis backend. * fix: remove unused dependencies and update logo in README * docs: update CHANGELOG and README for Redis backend features and example enhancements
1 parent 9d07b19 commit 67aa210

File tree

20 files changed

+866
-93
lines changed

20 files changed

+866
-93
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Introduced a pluggable result backend API with a Redis implementation, enhanced
13+
`AsyncResult` helpers, and a `redis_results` example with docs.
1214
- Introduced a Redis-backed distributed scheduler backend with configurable lock
1315
management and task state persistence, plus documentation and examples for
1416
running multi-instance beat deployments.
@@ -28,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2830
- Removed manual sccache configuration in favor of built-in caching
2931
- Temporarily disabled minimal versions check due to upstream regex-syntax compatibility issue
3032
- Removed global `-D warnings` rustflags to prevent builds failing on non-critical warnings
33+
- Reworked README quick-start and example sections for clearer onboarding.
3134

3235
## [v0.5.5](https://github.com/rusty-celery/rusty-celery/releases/tag/v0.5.5) - 2023-09-25
3336

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ name = "celery_app"
2626
[[example]]
2727
name = "redis_beat"
2828

29+
[[example]]
30+
name = "redis_results"
31+
2932
[dependencies]
3033
base64 = "0.22.1"
3134
chrono = { version = "0.4.42", features = ["serde"] }
@@ -44,7 +47,7 @@ log = "0.4.28"
4447
futures = { version = "0.3.31", features = ["async-await"] }
4548
uuid = { version = "1.18.1", features = ["v4"]}
4649
rand = "0.8"
47-
celery-rs-codegen = { version = "0.6.0", path = "./celery-codegen", optional = true }
50+
celery-rs-codegen = { version = "0.6.1", path = "./celery-codegen", optional = true }
4851
colored = "3.0.0"
4952
once_cell = { version = "1.21.3" }
5053
globset = "0.4.16"

README.md

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ We welcome contributions from everyone regardless of your experience level with
3535

3636
If you already know the basics of Rust but are new to Celery, check out the [Rusty Celery Book](https://rusty-celery.github.io/) or the original Python [Celery Project](http://www.celeryproject.org/).
3737

38-
## Quick start
38+
## Getting Started
39+
40+
### Quick start
3941

4042
Define tasks by decorating functions with the [`task`](https://docs.rs/celery-rs/*/celery/attr.task.html) attribute.
4143

@@ -73,13 +75,60 @@ And consume tasks as a worker from a queue with
7375
my_app.consume().await?;
7476
```
7577

76-
## Examples
78+
### Capturing results
79+
80+
Configure a result backend to persist task state and fetch results from the client side:
81+
82+
```rust
83+
use std::time::Duration;
84+
use celery::backend::RedisBackend;
85+
use celery::prelude::*;
86+
87+
#[celery::task]
88+
fn add(x: i32, y: i32) -> TaskResult<i32> {
89+
Ok(x + y)
90+
}
91+
92+
#[tokio::main]
93+
async fn main() -> anyhow::Result<()> {
94+
let backend = RedisBackend::new("redis://127.0.0.1/0")?
95+
.with_result_ttl(Duration::from_secs(600));
96+
97+
let app = celery::app!(
98+
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
99+
tasks = [add],
100+
task_routes = [ "*" => "celery" ],
101+
result_backend = backend,
102+
).await?;
103+
104+
let async_result = app.send_task(add::new(1, 2)).await?;
105+
println!("state = {}", async_result.state().await?);
106+
107+
let sum: i32 = async_result.get(Some(Duration::from_secs(10))).await?;
108+
println!("1 + 2 = {sum}");
109+
Ok(())
110+
}
111+
```
112+
113+
[`AsyncResult`](https://docs.rs/celery-rs/latest/celery/task/struct.AsyncResult.html) now
114+
exposes idiomatic helpers: `state()` for the latest `TaskState`, `ready()` to check completion,
115+
and `get(timeout)` to await the final value (raising a `BackendError::Timeout` on expiration).
116+
117+
### Example catalog
77118

78119
The [`examples/`](https://github.com/GaiaNet-AI/celery-rs/tree/main/examples) directory contains:
79120

80121
- a simple Celery app implemented in Rust using an AMQP broker ([`examples/celery_app.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/celery_app.rs)),
81122
- the same Celery app implemented in Python ([`examples/celery_app.py`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/celery_app.py)),
82-
- and a Beat app implemented in Rust ([`examples/beat_app.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/beat_app.rs)).
123+
- a Redis result-backend demo showing AsyncResult usage ([`examples/redis_results.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/redis_results.rs)),
124+
- a Beat app implemented in Rust ([`examples/beat_app.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/beat_app.rs)),
125+
- and a Redis-backed Beat scheduler with leader election ([`examples/redis_beat.rs`](https://github.com/GaiaNet-AI/celery-rs/blob/main/examples/redis_beat.rs)).
126+
127+
## Running the Examples
128+
129+
Explore the demos interactively (preview below):
130+
131+
![](./img/demo.gif)
83132

84133
### Prerequisites
85134

@@ -93,11 +142,7 @@ Otherwise simply run the helper script:
93142

94143
This will download and run the official [RabbitMQ](https://www.rabbitmq.com/) image (RabbitMQ is a popular AMQP broker).
95144

96-
### Run the examples
97-
98-
![](./img/demo.gif)
99-
100-
#### Run Rust Celery app
145+
### Run the Rust Celery app
101146

102147
You can consume tasks with:
103148

@@ -113,7 +158,7 @@ cargo run --example celery_app produce [task_name]
113158

114159
Current supported tasks for this example are: `add`, `buggy_task`, `long_running_task` and `bound_task`
115160

116-
#### Run Python Celery app
161+
### Run the Python Celery app
117162

118163
Similarly, you can consume or produce tasks from Python by running
119164

@@ -130,7 +175,7 @@ python examples/celery_app.py produce
130175

131176
You'll need to have Python 3 installed, along with the requirements listed in the `requirements.txt` file. You'll also have to provide a task name. This example implements 4 tasks: `add`, `buggy_task`, `long_running_task` and `bound_task`
132177

133-
#### Run Rust Beat app
178+
### Run the Rust Beat app
134179

135180
You can start the Rust beat with:
136181

@@ -140,7 +185,7 @@ cargo run --example beat_app
140185

141186
And then you can consume tasks from Rust or Python as explained above.
142187

143-
#### Redis-backed Beat
188+
### Redis-backed Beat failover
144189

145190
A Redis-powered distributed scheduler backend is available through `RedisSchedulerBackend`.
146191
To try it out locally (requires a Redis server running):
@@ -166,33 +211,18 @@ To test multi-instance failover:
166211
⚠️ = Partially implemented and under active development.<br/>
167212
🔴 = Not supported yet but on-deck to be implemented soon.
168213

169-
### Core
170-
171214
> **Note**: Issue tracking links below reference this repository.
172215
173-
| | Status | Tracking |
174-
| ---------------- |:-------:| --------- |
175-
| Protocol | ⚠️ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Protocol%20Feature%22+is%3Aopen) |
176-
| Producers || |
177-
| Consumers || |
178-
| Brokers || |
179-
| Beat || |
180-
| Backends | ⚠️ | |
181-
| Baskets | 🔴 | |
182-
183-
### Brokers
184-
185-
| | Status | Tracking |
186-
| ----- |:------:| -------- |
187-
| AMQP || [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20AMQP%22+is%3Aopen) |
188-
| Redis || [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20Redis%22+is%3Aopen) |
189-
190-
### Backends
191-
192-
| | Status | Tracking |
193-
| ----------- |:------:| -------- |
194-
| RPC | 🔴 | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20RPC%22+is%3Aopen) |
195-
| Redis || [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20Redis%22+is%3Aopen) |
216+
| Area | Component | Status | Notes / Tracking |
217+
|-----------|------------|:------:|------------------|
218+
| Core | Protocol | ⚠️ | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Protocol%20Feature%22+is%3Aopen) |
219+
| Core | Producers || |
220+
| Core | Consumers || |
221+
| Core | Beat || |
222+
| Brokers | AMQP || [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20AMQP%22+is%3Aopen) |
223+
| Brokers | Redis || [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Broker%3A%20Redis%22+is%3Aopen) |
224+
| Backends | RPC | 🔴 | [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20RPC%22+is%3Aopen) |
225+
| Backends | Redis || Task results + Beat (0.6.2); [Open issues](https://github.com/GaiaNet-AI/celery-rs/issues?q=is%3Aissue+label%3A%22Backend%3A%20Redis%22+is%3Aopen) |
196226

197227
## Project History and Maintenance
198228

examples/celery_app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from celery.bin.celery import main as _main
1010

1111

12-
my_app = Celery("celery", broker=os.environ.get("AMQP_ADDR", "amqp://127.0.0.1:5672"))
12+
# my_app = Celery("celery", broker=os.environ.get("AMQP_ADDR", "amqp://127.0.0.1:5672"))
13+
my_app = Celery("celery", broker=os.environ.get("REDIS_ADDR", "redis://127.0.0.1:6379/0"))
1314
my_app.conf.update(
1415
result_backend=None,
1516
task_ignore_result=True,

examples/redis_results.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use anyhow::Result;
2+
use celery::prelude::*;
3+
use env_logger::Env;
4+
use structopt::StructOpt;
5+
use tokio::time::Duration;
6+
7+
#[celery::task]
8+
fn add(x: i32, y: i32) -> TaskResult<i32> {
9+
Ok(x + y)
10+
}
11+
12+
#[derive(Debug, StructOpt)]
13+
#[structopt(
14+
name = "redis_results",
15+
about = "Demo: Redis result backend for celery-rs",
16+
setting = structopt::clap::AppSettings::ColoredHelp,
17+
)]
18+
enum ModeOpt {
19+
#[structopt(about = "Start a worker that consumes from the Redis broker")]
20+
Consume,
21+
#[structopt(about = "Send an add task and wait for the result")]
22+
Produce,
23+
}
24+
25+
#[tokio::main]
26+
async fn main() -> Result<()> {
27+
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
28+
29+
let mode = ModeOpt::from_args();
30+
31+
let broker_url =
32+
std::env::var("REDIS_ADDR").unwrap_or_else(|_| "redis://127.0.0.1:6379/".into());
33+
let backend_url =
34+
std::env::var("REDIS_RESULT_ADDR").unwrap_or_else(|_| "redis://127.0.0.1:6379/1".into());
35+
36+
let app = celery::app!(
37+
broker = RedisBroker { broker_url },
38+
tasks = [add],
39+
task_routes = ["*" => "celery"],
40+
result_backend = RedisBackend::new(&backend_url)
41+
.expect("valid Redis result backend")
42+
.with_result_ttl(Duration::from_secs(600)),
43+
)
44+
.await?;
45+
46+
match mode {
47+
ModeOpt::Consume => {
48+
app.display_pretty().await;
49+
app.consume().await?;
50+
}
51+
ModeOpt::Produce => {
52+
let handle = app.send_task(add::new(2, 40)).await?;
53+
println!("Dispatched task {}", handle.task_id());
54+
55+
let sum: i32 = handle.get(Some(Duration::from_secs(10))).await?;
56+
println!("2 + 40 = {sum}");
57+
}
58+
}
59+
60+
app.close().await?;
61+
Ok(())
62+
}

img/celery-rs-logo.png

2.82 MB
Loading

src/app/mod.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use tokio_stream::StreamMap;
1717

1818
mod trace;
1919

20+
use crate::backend::ResultBackend;
2021
use crate::broker::{
2122
broker_builder_from_url, build_and_connect, configure_task_routes, Broker, BrokerBuilder,
2223
Delivery,
@@ -38,6 +39,7 @@ struct Config {
3839
default_queue: String,
3940
task_options: TaskOptions,
4041
task_routes: Vec<(String, String)>,
42+
result_backend: Option<Arc<dyn ResultBackend>>,
4143
}
4244

4345
/// Used to create a [`Celery`] app with a custom configuration.
@@ -67,6 +69,7 @@ impl CeleryBuilder {
6769
default_queue: "celery".into(),
6870
task_options: TaskOptions::default(),
6971
task_routes: vec![],
72+
result_backend: None,
7073
},
7174
}
7275
}
@@ -86,6 +89,15 @@ impl CeleryBuilder {
8689
self
8790
}
8891

92+
/// Configure a result backend implementation for storing task results.
93+
pub fn result_backend<B>(mut self, backend: B) -> Self
94+
where
95+
B: ResultBackend + 'static,
96+
{
97+
self.config.result_backend = Some(Arc::new(backend));
98+
self
99+
}
100+
89101
/// Set the prefetch count. The default value depends on the broker implementation,
90102
/// but it's recommended that you always set this to a value that works best
91103
/// for your application.
@@ -224,6 +236,7 @@ impl CeleryBuilder {
224236
broker_connection_retry: self.config.broker_connection_retry,
225237
broker_connection_max_retries: self.config.broker_connection_max_retries,
226238
broker_connection_retry_delay: self.config.broker_connection_retry_delay,
239+
result_backend: self.config.result_backend.clone(),
227240
})
228241
}
229242
}
@@ -257,9 +270,15 @@ pub struct Celery {
257270
broker_connection_retry: bool,
258271
broker_connection_max_retries: u32,
259272
broker_connection_retry_delay: u32,
273+
result_backend: Option<Arc<dyn ResultBackend>>,
260274
}
261275

262276
impl Celery {
277+
/// Returns a clone of the configured result backend, if any.
278+
pub fn result_backend(&self) -> Option<Arc<dyn ResultBackend>> {
279+
self.result_backend.clone()
280+
}
281+
263282
/// Print a pretty ASCII art logo and configuration settings.
264283
///
265284
/// This is useful and fun to print from a worker application right after
@@ -316,7 +335,10 @@ impl Celery {
316335
queue,
317336
);
318337
self.broker.send(&message, queue).await?;
319-
Ok(AsyncResult::new(message.task_id()))
338+
Ok(AsyncResult::with_backend(
339+
message.task_id(),
340+
self.result_backend(),
341+
))
320342
}
321343

322344
/// Register a task.

0 commit comments

Comments
 (0)