Skip to content

Commit 45d1873

Browse files
tamimajzeejersZach SherbondyZach Sherbondy
authored
Bug 1 prefix (#24)
* Debugging Key Prefix Issue. Removed Ioredis types dev dep. * bug-1: strip the prefix prior to binding listeners if using keyPrefix. (#20) * bug-1: strip the prefix prior to binding listeners if using keyPrefix. * Additional strip / prepend logic to account for ioredis xgroup not using keyPrefix. --------- Co-authored-by: Zach Sherbondy <[email protected]> Co-authored-by: Zach Sherbondy <[email protected]> * Removed debug logs. Changed createConsumerGroup debug log to the new stream name with the prefix if existent. Reverted example apps state. * Bumped version to 1.2.0. --------- Co-authored-by: Zach <[email protected]> Co-authored-by: Zach Sherbondy <[email protected]> Co-authored-by: Zach Sherbondy <[email protected]>
1 parent f4920a6 commit 45d1873

File tree

7 files changed

+706
-688
lines changed

7 files changed

+706
-688
lines changed

examples/client-app/package-lock.json

Lines changed: 206 additions & 212 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/users-microservice/package-lock.json

Lines changed: 207 additions & 222 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/users-microservice/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
"@nestjs/schematics": "^9.2.0",
3737
"@nestjs/testing": "^9.4.3",
3838
"@types/express": "^4.17.21",
39-
"@types/ioredis": "^4.28.10",
4039
"@types/jest": "28.1.8",
4140
"@types/node": "^16.18.101",
4241
"@types/supertest": "^2.0.16",

examples/users-microservice/src/main.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ async function bootstrap() {
1111
strategy: new RedisStreamStrategy({
1212
connection: {
1313
url: '0.0.0.0:6379',
14+
// keyPrefix: 'my-key-prefix:',
1415
},
1516
streams: {
1617
block: 5000,

lib/redis.server.ts

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,15 @@ export class RedisStreamStrategy
9696
try {
9797
if (!this.redis) throw new Error('Redis instance not found.');
9898

99-
await this.redis.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');
99+
const modifiedStreamKey = this.prependPrefix(stream);
100+
101+
await this.redis.xgroup(
102+
'CREATE',
103+
modifiedStreamKey,
104+
consumerGroup,
105+
'$',
106+
'MKSTREAM',
107+
);
100108

101109
return true;
102110
} catch (error) {
@@ -106,7 +114,7 @@ export class RedisStreamStrategy
106114
'Consumer Group "' +
107115
consumerGroup +
108116
'" already exists for stream: ' +
109-
stream,
117+
this.prependPrefix(stream),
110118
);
111119
return true;
112120
} else {
@@ -214,12 +222,13 @@ export class RedisStreamStrategy
214222

215223
private async notifyHandlers(stream: string, messages: any[]) {
216224
try {
217-
const handler = this.streamHandlerMap[stream];
225+
const modifiedStream = this.stripPrefix(stream);
226+
const handler = this.streamHandlerMap[modifiedStream];
218227

219228
await Promise.all(
220229
messages.map(async (message) => {
221230
let ctx = new RedisStreamContext([
222-
stream,
231+
modifiedStream,
223232
message[0], // message id needed for ACK.
224233
this.options?.streams?.consumerGroup,
225234
this.options?.streams?.consumer,
@@ -269,10 +278,14 @@ export class RedisStreamStrategy
269278
'BLOCK',
270279
this.options?.streams?.block || 0,
271280
'STREAMS',
272-
...(Object.keys(this.streamHandlerMap) as string[]), // streams keys
273-
...(Object.keys(this.streamHandlerMap) as string[]).map(
274-
(stream: string) => '>',
275-
), // '>', this is needed for xreadgroup as id.
281+
...(Object.keys(this.streamHandlerMap).map((s) =>
282+
this.stripPrefix(s),
283+
) as string[]), // streams keys
284+
...(
285+
Object.keys(this.streamHandlerMap).map((s) =>
286+
this.stripPrefix(s),
287+
) as string[]
288+
).map((stream: string) => '>'), // '>', this is needed for xreadgroup as id.
276289
);
277290

278291
// if BLOCK time ended, and results are null, listen again.
@@ -285,10 +298,32 @@ export class RedisStreamStrategy
285298

286299
return this.listenOnStreams();
287300
} catch (error) {
301+
console.log('Error in listenOnStreams: ', error);
288302
this.logger.error(error);
289303
}
290304
}
291305

306+
// When the stream handler name is stored in streamHandlerMap, its stored WITH the key prefix, so sending additional redis commands when using the prefix with the existing key will cause a duplicate prefix. This ensures to strip the first occurrence of the prefix when binding listeners.
307+
private stripPrefix(streamHandlerName: string) {
308+
const keyPrefix = this?.redis?.options?.keyPrefix;
309+
if (!keyPrefix || !streamHandlerName.startsWith(keyPrefix)) {
310+
return streamHandlerName;
311+
}
312+
// Replace just the first instance of the substring
313+
return streamHandlerName.replace(keyPrefix, '');
314+
}
315+
316+
// xgroup CREATE command with ioredis does not automatically prefix the keyPrefix, though many other commands do, such as xreadgroup.
317+
// https://github.com/redis/ioredis/issues/1659
318+
private prependPrefix(key: string) {
319+
const keyPrefix = this?.redis?.options?.keyPrefix;
320+
if (keyPrefix && !key.startsWith(keyPrefix)) {
321+
return `${keyPrefix}${key}`;
322+
} else {
323+
return key;
324+
}
325+
}
326+
292327
// for redis instances. need to add mechanism to try to connect back.
293328
public handleError(stream: any) {
294329
stream.on(ERROR_EVENT, (err: any) => {

0 commit comments

Comments
 (0)