Skip to content

Commit b813e95

Browse files
authored
Readonly replica selection (#253)
1 parent 41e8f86 commit b813e95

File tree

7 files changed

+202
-38
lines changed

7 files changed

+202
-38
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,9 @@ public final class ValkeyClusterClient: Sendable {
146146
@inlinable
147147
public func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response {
148148
let hashSlot = try self.hashSlot(for: command.keysAffected)
149+
let nodeSelection = getNodeSelection(readOnly: command.isReadOnly)
149150
var clientSelector: () async throws -> ValkeyNodeClient = {
150-
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
151+
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
151152
}
152153

153154
var asking = false
@@ -252,9 +253,11 @@ public final class ValkeyClusterClient: Sendable {
252253
_ commands: [any ValkeyCommand]
253254
) async -> [Result<RESPToken, any Error>] {
254255
guard commands.count > 0 else { return [] }
256+
let readOnlyCommand = commands.reduce(true) { $0 && $1.isReadOnly }
257+
let nodeSelection = getNodeSelection(readOnly: readOnlyCommand)
255258
// get a list of nodes and the commands that should be run on them
256259
do {
257-
let nodes = try await self.splitCommandsAcrossNodes(commands: commands)
260+
let nodes = try await self.splitCommandsAcrossNodes(commands: commands, nodeSelection: nodeSelection)
258261
// if this list has one element, then just run the pipeline on that single node
259262
if nodes.count == 1 {
260263
do {
@@ -340,9 +343,10 @@ public final class ValkeyClusterClient: Sendable {
340343
_ commands: Commands
341344
) async throws -> [Result<RESPToken, Error>] where Commands.Element == any ValkeyCommand {
342345
let hashSlot = try self.hashSlot(for: commands.flatMap { $0.keysAffected })
343-
346+
let readOnlyCommand = commands.reduce(true) { $0 && $1.isReadOnly }
347+
let nodeSelection = getNodeSelection(readOnly: readOnlyCommand)
344348
var clientSelector: () async throws -> ValkeyNodeClient = {
345-
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
349+
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
346350
}
347351

348352
var asking = false
@@ -458,20 +462,32 @@ public final class ValkeyClusterClient: Sendable {
458462
///
459463
/// - Parameters:
460464
/// - keys: Keys affected by operation. This is used to choose the cluster node
465+
/// - readOnly: Is this connection only going to be used with readonly commands
461466
/// - isolation: Actor isolation
462467
/// - operation: Closure handling Valkey connection
463468
/// - Returns: Value returned by closure
464469
@inlinable
465470
public func withConnection<Value>(
466471
forKeys keys: some Collection<ValkeyKey>,
472+
readOnly: Bool = false,
467473
isolation: isolated (any Actor)? = #isolation,
468474
operation: (ValkeyConnection) async throws -> sending Value
469475
) async throws -> Value {
470476
let hashSlots = keys.compactMap { HashSlot(key: $0) }
471-
let node = try await self.nodeClient(for: hashSlots)
477+
let nodeSelection = getNodeSelection(readOnly: readOnly)
478+
let node = try await self.nodeClient(for: hashSlots, nodeSelection: nodeSelection)
472479
return try await node.withConnection(isolation: isolation, operation: operation)
473480
}
474481

482+
@inlinable
483+
/* private */ func getNodeSelection(readOnly: Bool) -> ValkeyClusterNodeSelection {
484+
if readOnly {
485+
self.clientConfiguration.readOnlyCommandNodeSelection.clusterNodeSelection
486+
} else {
487+
.primary
488+
}
489+
}
490+
475491
/// Starts running the cluster client.
476492
///
477493
/// This method initiates:
@@ -557,7 +573,10 @@ public final class ValkeyClusterClient: Sendable {
557573
/// These array of indices are then used to create collections of commands to
558574
/// run on each node
559575
@usableFromInline
560-
func splitCommandsAcrossNodes(commands: [any ValkeyCommand]) async throws -> [ValkeyServerAddress: NodeAndCommands].Values {
576+
func splitCommandsAcrossNodes(
577+
commands: [any ValkeyCommand],
578+
nodeSelection: ValkeyClusterNodeSelection
579+
) async throws -> [ValkeyServerAddress: NodeAndCommands].Values {
561580
var nodeMap: [ValkeyServerAddress: NodeAndCommands] = [:]
562581
var index = commands.startIndex
563582
var prevAddress: ValkeyServerAddress? = nil
@@ -570,7 +589,7 @@ public final class ValkeyClusterClient: Sendable {
570589
// Get hash slot for key and add all the commands you have iterated through so far to the
571590
// node associated with that key and break out of loop
572591
let hashSlot = try self.hashSlot(for: keysAffected)
573-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
592+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
574593
let address = node.serverAddress
575594
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
576595
nodeMap[address] = nodeAndCommands
@@ -586,7 +605,7 @@ public final class ValkeyClusterClient: Sendable {
586605
if keysAffected.count > 0 {
587606
// If command affects a key get hash slot for key and add command to the node associated with that key
588607
let hashSlot = try self.hashSlot(for: keysAffected)
589-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
608+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
590609
prevAddress = node.serverAddress
591610
nodeMap[prevAddress, default: .init(node: node, commandIndices: [])].commandIndices.append(index)
592611
} else {
@@ -597,7 +616,7 @@ public final class ValkeyClusterClient: Sendable {
597616
}
598617
} else {
599618
// if none of the commands affect any keys then choose a random node
600-
let node = try await self.nodeClient(for: [])
619+
let node = try await self.nodeClient(for: [], nodeSelection: nodeSelection)
601620
let address = node.serverAddress
602621
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
603622
nodeMap[address] = nodeAndCommands
@@ -854,14 +873,17 @@ public final class ValkeyClusterClient: Sendable {
854873
/// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
855874
/// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
856875
@inlinable
857-
package func nodeClient(for slots: some (Collection<HashSlot> & Sendable)) async throws -> ValkeyNodeClient {
876+
package func nodeClient(
877+
for slots: some (Collection<HashSlot> & Sendable),
878+
nodeSelection: ValkeyClusterNodeSelection
879+
) async throws -> ValkeyNodeClient {
858880
var retries = 0
859881
while retries < 3 {
860882
defer { retries += 1 }
861883

862884
do {
863885
return try self.stateLock.withLock { state -> ValkeyNodeClient in
864-
try state.poolFastPath(for: slots)
886+
try state.poolFastPath(for: slots, nodeSelection: nodeSelection)
865887
}
866888
} catch let error as ValkeyClusterError where error == .clusterIsUnavailable {
867889
let waiterID = self.nextRequestID()

Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -532,14 +532,18 @@ where
532532
}
533533

534534
@inlinable
535-
package func poolFastPath(for slots: some Collection<HashSlot>) throws(ValkeyClusterError) -> ConnectionPool {
535+
package func poolFastPath(
536+
for slots: some Collection<HashSlot>,
537+
nodeSelection: ValkeyClusterNodeSelection
538+
) throws(ValkeyClusterError) -> ConnectionPool {
536539
switch self.clusterState {
537540
case .unavailable:
538541
throw ValkeyClusterError.clusterIsUnavailable
539542

540543
case .degraded(let context):
541544
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
542-
if let pool = self.runningClients[shardID.primary]?.pool {
545+
let nodeID = nodeSelection.select(nodeIDs: shardID)
546+
if let pool = self.runningClients[nodeID]?.pool {
543547
return pool
544548
}
545549
// If we don't have a node for a shard, that means that this shard got created from
@@ -549,7 +553,8 @@ where
549553

550554
case .healthy(let context):
551555
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
552-
if let pool = self.runningClients[shardID.primary]?.pool {
556+
let nodeID = nodeSelection.select(nodeIDs: shardID)
557+
if let pool = self.runningClients[nodeID]?.pool {
553558
return pool
554559
}
555560
// If we don't have a node for a shard, that means that this shard got created from
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//
2+
// This source file is part of the valkey-swift project
3+
// Copyright (c) 2025 the valkey-swift project authors
4+
//
5+
// See LICENSE.txt for license information
6+
// SPDX-License-Identifier: Apache-2.0
7+
//
8+
9+
@usableFromInline
10+
package enum ValkeyClusterNodeSelection: Sendable {
11+
case primary
12+
case cycleReplicas(Int)
13+
case cycleAllNodes(Int)
14+
15+
/// Select node from node ids
16+
/// - Parameter nodeIDs: Primary and replica nodes
17+
/// - Returns: ID of selected node
18+
@usableFromInline
19+
func select(nodeIDs: ValkeyShardNodeIDs) -> ValkeyNodeID {
20+
switch self {
21+
case .primary:
22+
return nodeIDs.primary
23+
case .cycleReplicas(let index):
24+
guard nodeIDs.replicas.count > 0 else { return nodeIDs.primary }
25+
return nodeIDs.replicas[index % nodeIDs.replicas.count]
26+
case .cycleAllNodes(let index):
27+
let index = index % (nodeIDs.replicas.count + 1)
28+
if index == 0 {
29+
return nodeIDs.primary
30+
} else {
31+
return nodeIDs.replicas[index - 1]
32+
}
33+
}
34+
}
35+
}
36+
37+
@available(valkeySwift 1.0, *)
38+
extension ValkeyClientConfiguration.ReadOnlyCommandNodeSelection {
39+
/// Convert from ``ValkeyClientConfiguration/ReadOnlyCommandNodeSelection`` to node selection
40+
@usableFromInline
41+
var clusterNodeSelection: ValkeyClusterNodeSelection {
42+
switch self.value {
43+
case .primary:
44+
.primary
45+
case .cycleReplicas:
46+
.cycleReplicas(Self.idGenerator.next())
47+
case .cycleAllNodes:
48+
.cycleAllNodes(Self.idGenerator.next())
49+
}
50+
}
51+
52+
static let idGenerator: IDGenerator = .init()
53+
}

Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ extension ValkeyClusterClient {
2020
isolation: isolated (any Actor)? = #isolation,
2121
_ operation: (ValkeyConnection) async throws -> sending Value
2222
) async throws -> sending Value {
23-
let node = try await self.nodeClient(for: [])
23+
let node = try await self.nodeClient(for: [], nodeSelection: .primary)
2424
let id = node.subscriptionConnectionIDGenerator.next()
2525

2626
let connection = try await withTaskCancellationHandler {

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,24 @@ public struct ValkeyClientConfiguration: Sendable {
153153
}
154154
}
155155

156+
/// Determine how nodes are chosen for readonly commands
157+
public struct ReadOnlyCommandNodeSelection: Sendable {
158+
enum _Internal {
159+
case primary
160+
case cycleReplicas
161+
case cycleAllNodes
162+
}
163+
164+
let value: _Internal
165+
166+
/// Always use the primary node
167+
public static var primary: Self { .init(value: .primary) }
168+
/// Cycle through replicas
169+
public static var cycleReplicas: Self { .init(value: .cycleReplicas) }
170+
/// Cycle through primary and replicas
171+
public static var cycleAllNodes: Self { .init(value: .cycleAllNodes) }
172+
}
173+
156174
/// The authentication credentials for the connection.
157175
public var authentication: Authentication?
158176
/// The connection pool configuration.
@@ -174,6 +192,14 @@ public struct ValkeyClientConfiguration: Sendable {
174192
/// Database Number to use for the Valkey Connection
175193
public var databaseNumber: Int = 0
176194

195+
/// Determine how we chose nodes for readonly commands
196+
///
197+
/// Cluster by default will redirect commands from replica nodes to the primary node.
198+
/// Setting this value to something other than ``ReadOnlyCommandNodeSelection/primary``
199+
/// will allow replicas to run readonly commands. This will reduce load on your primary
200+
/// nodes but there is a chance you will receive stale data as the replica is not up to date.
201+
public var readOnlyCommandNodeSelection: ReadOnlyCommandNodeSelection
202+
177203
#if DistributedTracingSupport
178204
/// The distributed tracing configuration to use for the Valkey connection.
179205
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
@@ -191,6 +217,7 @@ public struct ValkeyClientConfiguration: Sendable {
191217
/// - blockingCommandTimeout: The timeout for a blocking command response.
192218
/// - tls: The TLS configuration.
193219
/// - databaseNumber: The Valkey Database number.
220+
/// - readOnlyCommandNodeSelection: How we choose a node when processing readonly commands
194221
public init(
195222
authentication: Authentication? = nil,
196223
connectionPool: ConnectionPool = .init(),
@@ -199,7 +226,8 @@ public struct ValkeyClientConfiguration: Sendable {
199226
commandTimeout: Duration = .seconds(30),
200227
blockingCommandTimeout: Duration = .seconds(120),
201228
tls: TLS = .disable,
202-
databaseNumber: Int = 0
229+
databaseNumber: Int = 0,
230+
readOnlyCommandNodeSelection: ReadOnlyCommandNodeSelection = .primary
203231
) {
204232
self.authentication = authentication
205233
self.connectionPool = connectionPool
@@ -209,5 +237,6 @@ public struct ValkeyClientConfiguration: Sendable {
209237
self.blockingCommandTimeout = blockingCommandTimeout
210238
self.tls = tls
211239
self.databaseNumber = databaseNumber
240+
self.readOnlyCommandNodeSelection = readOnlyCommandNodeSelection
212241
}
213242
}

0 commit comments

Comments
 (0)