Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ export default class RedisCommandsQueue {
return this.#addPubSubCommand(command);
}

removeAllPubSubListeners() {
return this.#pubSub.removeAllListeners();
}

resubscribe(chainId?: symbol) {
const commands = this.#pubSub.resubscribe();
if (!commands.length) return;
Expand Down
11 changes: 8 additions & 3 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,17 +765,17 @@ export default class RedisClient<
}
});
}

if (this.#clientSideCache) {
commands.push({cmd: this.#clientSideCache.trackingOn()});
}

if (this.#options?.emitInvalidate) {
commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']});
}

const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options);

if(maintenanceHandshakeCmd) {
commands.push(maintenanceHandshakeCmd);
};
Expand Down Expand Up @@ -818,6 +818,11 @@ export default class RedisClient<
chainId = Symbol('Socket Initiator');

const resubscribePromise = this.#queue.resubscribe(chainId);
resubscribePromise?.catch(error => {
if (error.message && error.message.startsWith('MOVED')) {
this.emit('__MOVED', this._self.#queue.removeAllPubSubListeners());
}
});
if (resubscribePromise) {
promises.push(resubscribePromise);
}
Expand Down
41 changes: 30 additions & 11 deletions packages/client/lib/client/pub-sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,22 +323,25 @@ export class PubSub {
}

resubscribe() {
const commands = [];
const commands: PubSubCommand[] = [];
for (const [type, listeners] of Object.entries(this.listeners)) {
if (!listeners.size) continue;

this.#isActive = true;
this.#subscribing++;

const callback = () => this.#subscribing--;
commands.push({
args: [
COMMANDS[type as PubSubType].subscribe,
...listeners.keys()
],
channelsCounter: listeners.size,
resolve: callback,
reject: callback
} satisfies PubSubCommand);
for(const channel of listeners.keys()) {
this.#subscribing++;
commands.push({
args: [
COMMANDS[type as PubSubType].subscribe,
channel
],
channelsCounter: 1,
resolve: callback,
reject: callback
})
}
}

return commands;
Expand Down Expand Up @@ -379,6 +382,22 @@ export class PubSub {
return listeners;
}

removeAllListeners() {
const result = {
[PUBSUB_TYPE.CHANNELS]: this.listeners[PUBSUB_TYPE.CHANNELS],
[PUBSUB_TYPE.PATTERNS]: this.listeners[PUBSUB_TYPE.PATTERNS],
[PUBSUB_TYPE.SHARDED]: this.listeners[PUBSUB_TYPE.SHARDED]
}

this.#updateIsActive();

this.listeners[PUBSUB_TYPE.CHANNELS] = new Map();
this.listeners[PUBSUB_TYPE.PATTERNS] = new Map();
this.listeners[PUBSUB_TYPE.SHARDED] = new Map();

return result;
}

#emitPubSubMessage(
type: PubSubType,
message: Buffer,
Expand Down
27 changes: 18 additions & 9 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { RedisClusterClientOptions, RedisClusterOptions } from '.';
import { RootNodesUnavailableError } from '../errors';
import RedisClient, { RedisClientOptions, RedisClientType } from '../client';
import { EventEmitter } from 'node:stream';
import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pub-sub';
import { ChannelListeners, PUBSUB_TYPE, PubSubListeners, PubSubTypeListeners } from '../client/pub-sub';
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
import calculateSlot from 'cluster-key-slot';
import { RedisSocketOptions } from '../client/socket';
Expand Down Expand Up @@ -185,6 +185,7 @@ export default class RedisClusterSlots<
async #discover(rootNode: RedisClusterClientOptions) {
this.clientSideCache?.clear();
this.clientSideCache?.disable();

try {
const addressesInUse = new Set<string>(),
promises: Array<Promise<unknown>> = [],
Expand Down Expand Up @@ -337,23 +338,29 @@ export default class RedisClusterSlots<
const socket =
this.#getNodeAddress(node.address) ??
{ host: node.host, port: node.port, };
const client = Object.freeze({
const clientInfo = Object.freeze({
host: socket.host,
port: socket.port,
});
const emit = this.#emit;
return this.#clientFactory(
const client = this.#clientFactory(
this.#clientOptionsDefaults({
clientSideCache: this.clientSideCache,
RESP: this.#options.RESP,
socket,
readonly,
}))
.on('error', error => emit('node-error', error, client))
.on('reconnecting', () => emit('node-reconnecting', client))
.once('ready', () => emit('node-ready', client))
.once('connect', () => emit('node-connect', client))
.once('end', () => emit('node-disconnect', client));
.on('error', error => emit('node-error', error, clientInfo))
.on('reconnecting', () => emit('node-reconnecting', clientInfo))
.once('ready', () => emit('node-ready', clientInfo))
.once('connect', () => emit('node-connect', clientInfo))
.once('end', () => emit('node-disconnect', clientInfo))
.on('__MOVED', async (allPubSubListeners: PubSubListeners) => {
await this.rediscover(client);
this.#emit('__resubscribeAllPubSubListeners', allPubSubListeners);
});

return client;
}

#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
Expand All @@ -374,7 +381,9 @@ export default class RedisClusterSlots<

async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
this.#runningRediscoverPromise ??= this.#rediscover(startWith)
.finally(() => this.#runningRediscoverPromise = undefined);
.finally(() => {
this.#runningRediscoverPromise = undefined
});
return this.#runningRediscoverPromise;
}

Expand Down
30 changes: 29 additions & 1 deletion packages/client/lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { EventEmitter } from 'node:events';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
import { PubSubListener } from '../client/pub-sub';
import { PubSubListener, PubSubListeners } from '../client/pub-sub';
import { ErrorReply } from '../errors';
import { RedisTcpSocketOptions } from '../client/socket';
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
Expand Down Expand Up @@ -310,6 +310,7 @@ export default class RedisCluster<

this._options = options;
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
this.on('__resubscribeAllPubSubListeners', this.resubscribeAllPubSubListeners.bind(this));

if (options?.commandOptions) {
this._commandOptions = options.commandOptions;
Expand Down Expand Up @@ -584,6 +585,33 @@ export default class RedisCluster<
);
}

resubscribeAllPubSubListeners(allListeners: PubSubListeners) {
for(const [channel, listeners] of allListeners.CHANNELS) {
listeners.buffers.forEach(bufListener => {
this.subscribe(channel, bufListener, true);
});
listeners.strings.forEach(strListener => {
this.subscribe(channel, strListener);
});
};
for (const [channel, listeners] of allListeners.PATTERNS) {
listeners.buffers.forEach(bufListener => {
this.pSubscribe(channel, bufListener, true);
});
listeners.strings.forEach(strListener => {
this.pSubscribe(channel, strListener);
});
};
for (const [channel, listeners] of allListeners.SHARDED) {
listeners.buffers.forEach(bufListener => {
this.sSubscribe(channel, bufListener, true);
});
listeners.strings.forEach(strListener => {
this.sSubscribe(channel, strListener);
});
};
}

sUnsubscribe = this.SUNSUBSCRIBE;

/**
Expand Down
Loading