Skip to content

Commit ece4638

Browse files
Merge pull request #15305 from getlarge/fix-improve-rmq-server-pattern-matching
fix(microservices): Revisit RMQ pattern matching with wildcards
2 parents 6f90f9b + 732ee78 commit ece4638

File tree

3 files changed

+169
-13
lines changed

3 files changed

+169
-13
lines changed

packages/microservices/constants.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ export const RQM_DEFAULT_QUEUE_OPTIONS = {};
2121
export const RQM_DEFAULT_NOACK = true;
2222
export const RQM_DEFAULT_PERSISTENT = false;
2323
export const RQM_DEFAULT_NO_ASSERT = false;
24+
export const RMQ_SEPARATOR = '.';
25+
export const RMQ_WILDCARD_SINGLE = '*';
26+
export const RMQ_WILDCARD_ALL = '#';
2427

2528
export const ECONNREFUSED = 'ECONNREFUSED';
2629
export const CONN_ERR = 'CONN_ERR';

packages/microservices/server/server-rmq.ts

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import {
88
CONNECTION_FAILED_MESSAGE,
99
DISCONNECTED_RMQ_MESSAGE,
1010
NO_MESSAGE_HANDLER,
11+
RMQ_SEPARATOR,
12+
RMQ_WILDCARD_ALL,
13+
RMQ_WILDCARD_SINGLE,
1114
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
1215
RQM_DEFAULT_NOACK,
1316
RQM_DEFAULT_NO_ASSERT,
@@ -63,7 +66,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
6366
protected readonly queue: string;
6467
protected readonly noAck: boolean;
6568
protected readonly queueOptions: any;
66-
protected readonly wildcardHandlers = new Map<RegExp, MessageHandler>();
69+
protected readonly wildcardHandlers = new Map<string, MessageHandler>();
6770
protected pendingEventListeners: Array<{
6871
event: keyof RmqEvents;
6972
callback: RmqEvents[keyof RmqEvents];
@@ -365,8 +368,8 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
365368
if (this.wildcardHandlers.size === 0) {
366369
return null;
367370
}
368-
for (const [regex, handler] of this.wildcardHandlers) {
369-
if (regex.test(pattern)) {
371+
for (const [wildcardPattern, handler] of this.wildcardHandlers) {
372+
if (this.matchRmqPattern(wildcardPattern, pattern)) {
370373
return handler;
371374
}
372375
}
@@ -392,20 +395,46 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
392395
const handlers = this.getHandlers();
393396

394397
handlers.forEach((handler, pattern) => {
395-
const regex = this.convertRoutingKeyToRegex(pattern);
396-
if (regex) {
397-
this.wildcardHandlers.set(regex, handler);
398+
if (
399+
pattern.includes(RMQ_WILDCARD_ALL) ||
400+
pattern.includes(RMQ_WILDCARD_SINGLE)
401+
) {
402+
this.wildcardHandlers.set(pattern, handler);
398403
}
399404
});
400405
}
401406

402-
private convertRoutingKeyToRegex(routingKey: string): RegExp | undefined {
403-
if (!routingKey.includes('#') && !routingKey.includes('*')) {
404-
return;
407+
private matchRmqPattern(pattern: string, routingKey: string): boolean {
408+
if (!routingKey) {
409+
return pattern === RMQ_WILDCARD_ALL;
410+
}
411+
412+
const patternSegments = pattern.split(RMQ_SEPARATOR);
413+
const routingKeySegments = routingKey.split(RMQ_SEPARATOR);
414+
415+
const patternSegmentsLength = patternSegments.length;
416+
const routingKeySegmentsLength = routingKeySegments.length;
417+
const lastIndex = patternSegmentsLength - 1;
418+
419+
for (const [i, currentPattern] of patternSegments.entries()) {
420+
const currentRoutingKey = routingKeySegments[i];
421+
422+
if (!currentRoutingKey && !currentPattern) {
423+
continue;
424+
}
425+
if (!currentRoutingKey && currentPattern !== RMQ_WILDCARD_ALL) {
426+
return false;
427+
}
428+
if (currentPattern === RMQ_WILDCARD_ALL) {
429+
return i === lastIndex;
430+
}
431+
if (
432+
currentPattern !== RMQ_WILDCARD_SINGLE &&
433+
currentPattern !== currentRoutingKey
434+
) {
435+
return false;
436+
}
405437
}
406-
let regexPattern = routingKey.replace(/\\/g, '\\\\').replace(/\./g, '\\.');
407-
regexPattern = regexPattern.replace(/\*/g, '[^.]+');
408-
regexPattern = regexPattern.replace(/#/g, '.*');
409-
return new RegExp(`^${regexPattern}$`);
438+
return patternSegmentsLength === routingKeySegmentsLength;
410439
}
411440
}

packages/microservices/test/server/server-rmq.spec.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,4 +306,128 @@ describe('ServerRMQ', () => {
306306
expect(nack.calledWith(message, false, false)).not.to.be.true;
307307
});
308308
});
309+
310+
describe('matchRmqPattern', () => {
311+
let matchRmqPattern: (pattern: string, routingKey: string) => boolean;
312+
313+
beforeEach(() => {
314+
matchRmqPattern = untypedServer.matchRmqPattern.bind(untypedServer);
315+
});
316+
317+
describe('exact matches', () => {
318+
it('should match identical patterns', () => {
319+
expect(matchRmqPattern('user.created', 'user.created')).to.be.true;
320+
expect(matchRmqPattern('order.updated', 'order.updated')).to.be.true;
321+
});
322+
323+
it('should not match different patterns', () => {
324+
expect(matchRmqPattern('user.created', 'user.updated')).to.be.false;
325+
expect(matchRmqPattern('order.created', 'user.created')).to.be.false;
326+
});
327+
328+
it('should handle patterns with $ character (original issue)', () => {
329+
expect(
330+
matchRmqPattern('$internal.plugin.status', '$internal.plugin.status'),
331+
).to.be.true;
332+
expect(
333+
matchRmqPattern(
334+
'$internal.plugin.0.status',
335+
'$internal.plugin.0.status',
336+
),
337+
).to.be.true;
338+
expect(matchRmqPattern('user.$special.event', 'user.$special.event')).to
339+
.be.true;
340+
});
341+
});
342+
343+
describe('single wildcard (*)', () => {
344+
it('should match single segments', () => {
345+
expect(matchRmqPattern('user.*', 'user.created')).to.be.true;
346+
expect(matchRmqPattern('user.*', 'user.updated')).to.be.true;
347+
expect(matchRmqPattern('*.created', 'user.created')).to.be.true;
348+
expect(matchRmqPattern('*.created', 'order.created')).to.be.true;
349+
});
350+
351+
it('should not match when segment counts differ', () => {
352+
expect(matchRmqPattern('user.*', 'user.profile.created')).to.be.false;
353+
expect(matchRmqPattern('*.created', 'user.profile.created')).to.be
354+
.false;
355+
});
356+
357+
it('should handle patterns with $ and *', () => {
358+
expect(
359+
matchRmqPattern(
360+
'$internal.plugin.*.status',
361+
'$internal.plugin.0.status',
362+
),
363+
).to.be.true;
364+
expect(
365+
matchRmqPattern(
366+
'$internal.plugin.*.status',
367+
'$internal.plugin.1.status',
368+
),
369+
).to.be.true;
370+
expect(matchRmqPattern('$internal.*.status', '$internal.plugin.status'))
371+
.to.be.true;
372+
});
373+
374+
it('should handle multiple * wildcards', () => {
375+
expect(matchRmqPattern('*.*.created', 'user.profile.created')).to.be
376+
.true;
377+
expect(matchRmqPattern('*.*.created', 'order.item.created')).to.be.true;
378+
expect(matchRmqPattern('*.*.created', 'user.created')).to.be.false;
379+
});
380+
});
381+
382+
describe('catch all wildcard (#)', () => {
383+
it('should match when # is at the end', () => {
384+
expect(matchRmqPattern('user.#', 'user.created')).to.be.true;
385+
expect(matchRmqPattern('user.#', 'user.profile.created')).to.be.true;
386+
expect(matchRmqPattern('user.#', 'user.profile.details.updated')).to.be
387+
.true;
388+
});
389+
390+
it('should handle patterns with $ and #', () => {
391+
expect(matchRmqPattern('$internal.#', '$internal.plugin.status')).to.be
392+
.true;
393+
expect(matchRmqPattern('$internal.#', '$internal.plugin.0.status')).to
394+
.be.true;
395+
expect(
396+
matchRmqPattern('$internal.plugin.#', '$internal.plugin.0.status'),
397+
).to.be.true;
398+
});
399+
400+
it('should handle # at the beginning', () => {
401+
expect(matchRmqPattern('#', 'user.created')).to.be.true;
402+
expect(matchRmqPattern('#', 'user.profile.created')).to.be.true;
403+
expect(matchRmqPattern('#', 'created')).to.be.true;
404+
});
405+
});
406+
407+
describe('edge cases', () => {
408+
it('should handle empty routing key', () => {
409+
expect(matchRmqPattern('user.created', '')).to.be.false;
410+
expect(matchRmqPattern('*', '')).to.be.false;
411+
expect(matchRmqPattern('#', '')).to.be.true;
412+
});
413+
414+
it('should handle single segments', () => {
415+
expect(matchRmqPattern('user', 'user')).to.be.true;
416+
expect(matchRmqPattern('*', 'user')).to.be.true;
417+
expect(matchRmqPattern('#', 'user')).to.be.true;
418+
});
419+
420+
it('should handle complex $ patterns that previously failed', () => {
421+
expect(
422+
matchRmqPattern(
423+
'$exchange.*.routing.#',
424+
'$exchange.topic.routing.key.test',
425+
),
426+
).to.be.true;
427+
expect(matchRmqPattern('$sys.#', '$sys.broker.clients')).to.be.true;
428+
expect(matchRmqPattern('$SYS.#', '$SYS.broker.load.messages.received'))
429+
.to.be.true;
430+
});
431+
});
432+
});
309433
});

0 commit comments

Comments
 (0)