Skip to content

Commit 309449d

Browse files
author
ds
committed
Add initial support for redis-sentinel
1 parent 16d307a commit 309449d

File tree

4 files changed

+260
-0
lines changed

4 files changed

+260
-0
lines changed

relay-config/src/redis.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,19 @@ enum RedisConfigFromFile {
9292
#[serde(flatten)]
9393
options: PartialRedisConfigOptions,
9494
},
95+
96+
/// Connect to a Sentinel cluster.
97+
Sentinel {
98+
// List of redis:// urls of sentinel nodes.
99+
sentinel_nodes: Vec<String>,
100+
101+
/// Name of a monitored master from sentinel cluster.
102+
master_name: String,
103+
104+
/// Additional configuration options for the redis client and a connections pool.
105+
#[serde(flatten)]
106+
options: PartialRedisConfigOptions,
107+
},
95108
}
96109

97110
/// Redis configuration.
@@ -108,6 +121,17 @@ pub enum RedisConfig {
108121
},
109122
/// Connect to a single Redis instance.
110123
Single(SingleRedisConfig),
124+
125+
/// Connect to a Sentinel cluster.
126+
Sentinel {
127+
/// List of redis:// urls of sentinel nodes.
128+
sentinel_nodes: Vec<String>,
129+
/// Name of a monitored master from sentinel cluster.
130+
master_name: String,
131+
/// Options of the Redis config.
132+
#[serde(flatten)]
133+
options: PartialRedisConfigOptions,
134+
},
111135
}
112136

113137
/// Struct that can serialize a string to a single Redis connection.
@@ -155,6 +179,15 @@ impl From<RedisConfigFromFile> for RedisConfig {
155179
RedisConfigFromFile::SingleWithOpts { server, options } => {
156180
Self::Single(SingleRedisConfig::Detailed { server, options })
157181
}
182+
RedisConfigFromFile::Sentinel {
183+
sentinel_nodes,
184+
master_name,
185+
options,
186+
} => Self::Sentinel {
187+
sentinel_nodes,
188+
master_name,
189+
options,
190+
},
158191
}
159192
}
160193
}
@@ -193,6 +226,15 @@ pub enum RedisConfigRef<'a> {
193226
/// Options of the Redis config.
194227
options: RedisConfigOptions,
195228
},
229+
/// Connect to a Sentinel cluster.
230+
Sentinel {
231+
/// Reference to the Redis nodes urls of the cluster.
232+
sentinel_nodes: &'a [String],
233+
/// Name of a monitored master from sentinel cluster.
234+
master_name: &'a str,
235+
/// Options of the Redis config.
236+
options: RedisConfigOptions,
237+
},
196238
}
197239

198240
/// Helper struct bundling connections and options for the various Redis pools.
@@ -253,6 +295,15 @@ pub(super) fn build_redis_config(
253295
server,
254296
options: Default::default(),
255297
},
298+
RedisConfig::Sentinel {
299+
sentinel_nodes,
300+
master_name,
301+
options,
302+
} => RedisConfigRef::Sentinel {
303+
sentinel_nodes,
304+
master_name,
305+
options: build_redis_config_options(options, default_connections),
306+
},
256307
}
257308
}
258309

@@ -521,6 +572,35 @@ max_connections: 20
521572
);
522573
}
523574

575+
#[test]
576+
fn test_redis_sentinel_nodes_opts() {
577+
let yaml = r#"
578+
sentinel_nodes:
579+
- "redis://127.0.0.1:26379"
580+
- "redis://127.0.0.2:26379"
581+
master_name: sentry-redis
582+
max_connections: 10
583+
"#;
584+
585+
let config: RedisConfig = serde_yaml::from_str(yaml)
586+
.expect("Parsed processing redis config: sentinel with options");
587+
588+
assert_eq!(
589+
config,
590+
RedisConfig::Sentinel {
591+
sentinel_nodes: vec![
592+
"redis://127.0.0.1:26379".to_owned(),
593+
"redis://127.0.0.2:26379".to_owned()
594+
],
595+
master_name: "sentry-redis".to_owned(),
596+
options: PartialRedisConfigOptions {
597+
max_connections: Some(10),
598+
..Default::default()
599+
},
600+
}
601+
);
602+
}
603+
524604
#[test]
525605
fn test_redis_cluster_serialize() {
526606
let config = RedisConfig::Cluster {
@@ -634,4 +714,34 @@ max_connections: 20
634714
}
635715
"###);
636716
}
717+
718+
#[test]
719+
fn test_redis_sentinel_serialize() {
720+
let config = RedisConfig::Sentinel {
721+
sentinel_nodes: vec![
722+
"redis://127.0.0.1:26379".to_owned(),
723+
"redis://127.0.0.2:26379".to_owned(),
724+
],
725+
master_name: "sentry-redis".to_owned(),
726+
options: PartialRedisConfigOptions {
727+
max_connections: Some(42),
728+
..Default::default()
729+
},
730+
};
731+
732+
assert_json_snapshot!(config, @r###"
733+
{
734+
"sentinel_nodes": [
735+
"redis://127.0.0.1:26379",
736+
"redis://127.0.0.2:26379"
737+
],
738+
"master_name": "sentry-redis",
739+
"max_connections": 42,
740+
"idle_timeout": 60,
741+
"create_timeout": 3,
742+
"recycle_timeout": 2,
743+
"recycle_check_frequency": 100
744+
}
745+
"###);
746+
}
637747
}

relay-redis/src/pool.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use deadpool::managed::{Manager, Metrics, Object, Pool, RecycleError, RecycleResult};
22
use deadpool_redis::Manager as SingleManager;
33
use deadpool_redis::cluster::Manager as ClusterManager;
4+
use deadpool_redis::sentinel::{Manager as SentinelManager, SentinelServerType};
45
use futures::FutureExt;
56
use std::ops::{Deref, DerefMut};
67

@@ -17,6 +18,9 @@ pub type CustomClusterPool = Pool<CustomClusterManager, CustomClusterConnection>
1718
/// A connection pool for single Redis instance deployments.
1819
pub type CustomSinglePool = Pool<CustomSingleManager, CustomSingleConnection>;
1920

21+
/// A connection pool for single Redis sentinel deployments.
22+
pub type CustomSentinelPool = Pool<CustomSentinelManager, CustomSentinelConnection>;
23+
2024
/// A wrapper for a connection that can be tracked with metadata.
2125
///
2226
/// A connection is considered detached as soon as it is marked as detached and it can't be
@@ -280,3 +284,93 @@ impl From<Object<CustomSingleManager>> for CustomSingleConnection {
280284
Self(conn)
281285
}
282286
}
287+
288+
/// Managed connection to a Redis-master instance
289+
///
290+
/// This manager handles the creation and recycling of Redis connections,
291+
/// ensuring proper connection health through periodic PING checks. It supports
292+
/// multiplexed connections for efficient handling of multiple operations.
293+
pub struct CustomSentinelConnection(Object<CustomSentinelManager>);
294+
295+
impl redis::aio::ConnectionLike for CustomSentinelConnection {
296+
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
297+
self.0.req_packed_command(cmd)
298+
}
299+
300+
fn req_packed_commands<'a>(
301+
&'a mut self,
302+
cmd: &'a Pipeline,
303+
offset: usize,
304+
count: usize,
305+
) -> RedisFuture<'a, Vec<Value>> {
306+
self.0.req_packed_commands(cmd, offset, count)
307+
}
308+
309+
fn get_db(&self) -> i64 {
310+
self.0.get_db()
311+
}
312+
}
313+
314+
/// Manages Redis-master connections and their lifecycle.
315+
///
316+
/// This manager handles the creation and recycling of Redis connections,
317+
/// ensuring proper connection health through periodic PING checks. It supports
318+
/// multiplexed connections for efficient handling of multiple operations.
319+
pub struct CustomSentinelManager {
320+
manager: SentinelManager,
321+
recycle_check_frequency: usize,
322+
}
323+
324+
impl CustomSentinelManager {
325+
/// Creates a new Sentinel manager for the specified Sentinel nodes.
326+
///
327+
/// The manager will attempt to connect to each node in the provided list and
328+
/// maintain connections to the Redis cluster.
329+
pub fn new<T: IntoConnectionInfo>(
330+
params: Vec<T>,
331+
master_name: String,
332+
recycle_check_frequency: usize,
333+
) -> RedisResult<Self> {
334+
Ok(Self {
335+
manager: SentinelManager::new(params, master_name, None, SentinelServerType::Master)?,
336+
recycle_check_frequency,
337+
})
338+
}
339+
}
340+
341+
impl Manager for CustomSentinelManager {
342+
type Type = TrackedConnection<MultiplexedConnection>;
343+
type Error = RedisError;
344+
345+
async fn create(&self) -> Result<TrackedConnection<MultiplexedConnection>, RedisError> {
346+
self.manager.create().await.map(TrackedConnection::from)
347+
}
348+
349+
async fn recycle(
350+
&self,
351+
conn: &mut TrackedConnection<MultiplexedConnection>,
352+
metrics: &Metrics,
353+
) -> RecycleResult<RedisError> {
354+
// If the connection is marked to be detached, we return and error, signaling that this
355+
// connection must be detached from the pool.
356+
if conn.detach {
357+
return Err(RecycleError::Message(
358+
"the tracked connection was marked as detached".into(),
359+
));
360+
}
361+
362+
// If the interval has been reached, we optimistically assume the connection is active
363+
// without doing an actual `PING`.
364+
if metrics.recycle_count % self.recycle_check_frequency != 0 {
365+
return Ok(());
366+
}
367+
368+
self.manager.recycle(conn, metrics).await
369+
}
370+
}
371+
372+
impl From<Object<CustomSentinelManager>> for CustomSentinelConnection {
373+
fn from(conn: Object<CustomSentinelManager>) -> Self {
374+
Self(conn)
375+
}
376+
}

relay-redis/src/real.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ pub enum AsyncRedisClient {
7979
Cluster(pool::CustomClusterPool),
8080
/// Contains a connection pool to a single Redis instance.
8181
Single(pool::CustomSinglePool),
82+
/// Contains a connection pool to a Redis-master instance.
83+
Sentinel(pool::CustomSentinelPool),
8284
}
8385

8486
impl AsyncRedisClient {
@@ -126,6 +128,37 @@ impl AsyncRedisClient {
126128
Ok(AsyncRedisClient::Single(pool))
127129
}
128130

131+
/// Creates a new connection client for a Redis-master instance.
132+
///
133+
/// This method initializes a connection client that can communicate with multiple Sentinel nodes
134+
/// to retrieve connection to a current Redis-master instance.
135+
/// The client is configured with the specified sentinel servers, master name and options.
136+
///
137+
/// The client uses a custom Sentinel manager that implements a specific connection recycling
138+
/// strategy, ensuring optimal performance and reliability in single-instance environments.
139+
pub fn sentinel<'a>(
140+
sentinels: impl IntoIterator<Item = &'a str>,
141+
master_name: &str,
142+
opts: &RedisConfigOptions,
143+
) -> Result<Self, RedisError> {
144+
let sentinels = sentinels
145+
.into_iter()
146+
.map(|s| s.to_owned())
147+
.collect::<Vec<_>>();
148+
// We use our custom single manager which performs recycling in a different way from the
149+
// default manager.
150+
let manager = pool::CustomSentinelManager::new(
151+
sentinels,
152+
master_name.to_owned(),
153+
opts.recycle_check_frequency,
154+
)
155+
.map_err(RedisError::Redis)?;
156+
157+
let pool = Self::build_pool(manager, opts)?;
158+
159+
Ok(AsyncRedisClient::Sentinel(pool))
160+
}
161+
129162
/// Acquires a connection from the pool.
130163
///
131164
/// Returns a new [`AsyncRedisConnection`] that can be used to execute Redis commands.
@@ -138,6 +171,9 @@ impl AsyncRedisClient {
138171
Self::Single(pool) => {
139172
AsyncRedisConnection::Single(pool.get().await.map_err(RedisError::Pool)?)
140173
}
174+
Self::Sentinel(pool) => {
175+
AsyncRedisConnection::Sentinel(pool.get().await.map_err(RedisError::Pool)?)
176+
}
141177
};
142178

143179
Ok(connection)
@@ -151,6 +187,7 @@ impl AsyncRedisClient {
151187
let status = match self {
152188
Self::Cluster(pool) => pool.status(),
153189
Self::Single(pool) => pool.status(),
190+
Self::Sentinel(pool) => pool.status(),
154191
};
155192

156193
RedisClientStats {
@@ -172,6 +209,9 @@ impl AsyncRedisClient {
172209
Self::Single(pool) => {
173210
pool.retain(|_, metrics| predicate(metrics));
174211
}
212+
Self::Sentinel(pool) => {
213+
pool.retain(|_, metrics| predicate(metrics));
214+
}
175215
}
176216
}
177217

@@ -210,6 +250,7 @@ impl std::fmt::Debug for AsyncRedisClient {
210250
match self {
211251
AsyncRedisClient::Cluster(_) => write!(f, "AsyncRedisPool::Cluster"),
212252
AsyncRedisClient::Single(_) => write!(f, "AsyncRedisPool::Single"),
253+
AsyncRedisClient::Sentinel(_) => write!(f, "AsyncRedisPool::Sentinel"),
213254
}
214255
}
215256
}
@@ -225,13 +266,16 @@ pub enum AsyncRedisConnection {
225266
Cluster(pool::CustomClusterConnection),
226267
/// A connection to a single Redis instance.
227268
Single(pool::CustomSingleConnection),
269+
/// A connection to a single Redis-master instance
270+
Sentinel(pool::CustomSentinelConnection),
228271
}
229272

230273
impl std::fmt::Debug for AsyncRedisConnection {
231274
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232275
let name = match self {
233276
Self::Cluster(_) => "Cluster",
234277
Self::Single(_) => "Single",
278+
Self::Sentinel(_) => "Sentinel",
235279
};
236280
f.debug_tuple(name).finish()
237281
}
@@ -242,6 +286,7 @@ impl redis::aio::ConnectionLike for AsyncRedisConnection {
242286
match self {
243287
Self::Cluster(conn) => conn.req_packed_command(cmd),
244288
Self::Single(conn) => conn.req_packed_command(cmd),
289+
Self::Sentinel(conn) => conn.req_packed_command(cmd),
245290
}
246291
}
247292

@@ -254,13 +299,15 @@ impl redis::aio::ConnectionLike for AsyncRedisConnection {
254299
match self {
255300
Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
256301
Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
302+
Self::Sentinel(conn) => conn.req_packed_commands(cmd, offset, count),
257303
}
258304
}
259305

260306
fn get_db(&self) -> i64 {
261307
match self {
262308
Self::Cluster(conn) => conn.get_db(),
263309
Self::Single(conn) => conn.get_db(),
310+
Self::Sentinel(conn) => conn.get_db(),
264311
}
265312
}
266313
}

0 commit comments

Comments
 (0)