Skip to content

Commit fb0e62c

Browse files
committed
refactor: extract push handlers out of the queue
1 parent f24bc83 commit fb0e62c

File tree

3 files changed

+51
-43
lines changed

3 files changed

+51
-43
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/ty
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
66
import { AbortError, ErrorReply, TimeoutDuringMaintanance, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
8-
import EventEmitter from 'events';
98
import assert from 'assert';
109

1110
export interface CommandOptions<T = TypeMapping> {
@@ -53,6 +52,13 @@ const RESP2_PUSH_TYPE_MAPPING = {
5352
[RESP_TYPES.SIMPLE_STRING]: Buffer
5453
};
5554

55+
// Try to handle a push notification. Return whether you
56+
// successfully consumed the notification or not. This is
57+
// important in order for the queue to be able to pass the
58+
// notification to another handler if the current one did not
59+
// succeed.
60+
type PushHandler = (pushItems: Array<any>) => boolean;
61+
5662
export default class RedisCommandsQueue {
5763
readonly #respVersion;
5864
readonly #maxLength;
@@ -62,7 +68,8 @@ export default class RedisCommandsQueue {
6268
#chainInExecution: symbol | undefined;
6369
readonly decoder;
6470
readonly #pubSub = new PubSub();
65-
readonly events = new EventEmitter();
71+
72+
#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];
6673

6774
// If this value is set, we are in a maintenance mode.
6875
// This means any existing commands should have their timeout
@@ -112,8 +119,6 @@ export default class RedisCommandsQueue {
112119
return this.#pubSub.isActive;
113120
}
114121

115-
#invalidateCallback?: (key: RedisArgument | null) => unknown;
116-
117122
constructor(
118123
respVersion: RespVersions,
119124
maxLength: number | null | undefined,
@@ -155,6 +160,7 @@ export default class RedisCommandsQueue {
155160
}
156161
return true;
157162
}
163+
return false
158164
}
159165

160166
#getTypeMapping() {
@@ -167,46 +173,16 @@ export default class RedisCommandsQueue {
167173
onErrorReply: err => this.#onErrorReply(err),
168174
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
169175
onPush: push => {
170-
if (!this.#onPush(push)) {
171-
// currently only supporting "invalidate" over RESP3 push messages
172-
switch (push[0].toString()) {
173-
case "invalidate": {
174-
if (this.#invalidateCallback) {
175-
if (push[1] !== null) {
176-
for (const key of push[1]) {
177-
this.#invalidateCallback(key);
178-
}
179-
} else {
180-
this.#invalidateCallback(null);
181-
}
182-
}
183-
break;
184-
}
185-
case 'MOVING': {
186-
const [_, afterMs, url] = push;
187-
const [host, port] = url.toString().split(':');
188-
this.events.emit('moving', afterMs, host, Number(port));
189-
break;
190-
}
191-
case 'MIGRATING':
192-
case 'FAILING_OVER': {
193-
this.events.emit('migrating');
194-
break;
195-
}
196-
case 'MIGRATED':
197-
case 'FAILED_OVER': {
198-
this.events.emit('migrated');
199-
break;
200-
}
201-
}
176+
for(const pushHandler of this.#pushHandlers) {
177+
if(pushHandler(push)) return
202178
}
203179
},
204180
getTypeMapping: () => this.#getTypeMapping()
205181
});
206182
}
207183

208-
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
209-
this.#invalidateCallback = callback;
184+
addPushHandler(handler: PushHandler): void {
185+
this.#pushHandlers.push(handler);
210186
}
211187

212188
async waitForInflightCommandsToComplete(): Promise<void> {

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,31 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
1111
this.#commandsQueue = commandsQueue;
1212
this.#options = options;
1313

14-
this.#commandsQueue.events.on("moving", this.#onMoving);
15-
this.#commandsQueue.events.on("migrating", this.#onMigrating);
16-
this.#commandsQueue.events.on("migrated", this.#onMigrated);
14+
this.#commandsQueue.addPushHandler(this.#onPush);
1715
}
1816

17+
#onPush = (push: Array<any>): boolean => {
18+
switch (push[0].toString()) {
19+
case "MOVING": {
20+
const [_, afterMs, url] = push;
21+
const [host, port] = url.toString().split(":");
22+
this.#onMoving(afterMs, host, Number(port));
23+
return true;
24+
}
25+
case "MIGRATING":
26+
case "FAILING_OVER": {
27+
this.#onMigrating();
28+
return true;
29+
}
30+
case "MIGRATED":
31+
case "FAILED_OVER": {
32+
this.#onMigrated();
33+
return true;
34+
}
35+
}
36+
return false
37+
};
38+
1939
// Queue:
2040
// toWrite [ C D E ]
2141
// waitingForReply [ A B ] - aka In-flight commands
@@ -62,7 +82,7 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
6282
#onMigrated = async () => {
6383
this.#commandsQueue.setMaintenanceCommandTimeout(undefined);
6484
this.emit("maintenance", undefined);
65-
}
85+
};
6686

6787
#getSocketTimeout(): number | undefined {
6888
return this.#options.gracefulMaintenance?.handleTimeouts === "error"

packages/client/lib/client/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,19 @@ export default class RedisClient<
500500
const cscConfig = options.clientSideCache;
501501
this.#clientSideCache = new BasicClientSideCache(cscConfig);
502502
}
503-
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
503+
this.#queue.addPushHandler((push: Array<any>): boolean => {
504+
if (push[0].toString() !== 'invalidate') return false;
505+
506+
if (push[1] !== null) {
507+
for (const key of push[1]) {
508+
this.#clientSideCache?.invalidate(key)
509+
}
510+
} else {
511+
this.#clientSideCache?.invalidate(null)
512+
}
513+
514+
return true
515+
});
504516
}
505517
}
506518

0 commit comments

Comments
 (0)