Merge pull request #15305 from getlarge/fix-improve-rmq-server-pattern-matching

fix(microservices): Revisit RMQ pattern matching with wildcards
This commit is contained in:
Kamil Mysliwiec
2025-07-14 11:40:25 +02:00
committed by GitHub
3 changed files with 169 additions and 13 deletions

View File

@@ -21,6 +21,9 @@ export const RQM_DEFAULT_QUEUE_OPTIONS = {};
export const RQM_DEFAULT_NOACK = true; export const RQM_DEFAULT_NOACK = true;
export const RQM_DEFAULT_PERSISTENT = false; export const RQM_DEFAULT_PERSISTENT = false;
export const RQM_DEFAULT_NO_ASSERT = false; export const RQM_DEFAULT_NO_ASSERT = false;
export const RMQ_SEPARATOR = '.';
export const RMQ_WILDCARD_SINGLE = '*';
export const RMQ_WILDCARD_ALL = '#';
export const ECONNREFUSED = 'ECONNREFUSED'; export const ECONNREFUSED = 'ECONNREFUSED';
export const CONN_ERR = 'CONN_ERR'; export const CONN_ERR = 'CONN_ERR';

View File

@@ -8,6 +8,9 @@ import {
CONNECTION_FAILED_MESSAGE, CONNECTION_FAILED_MESSAGE,
DISCONNECTED_RMQ_MESSAGE, DISCONNECTED_RMQ_MESSAGE,
NO_MESSAGE_HANDLER, NO_MESSAGE_HANDLER,
RMQ_SEPARATOR,
RMQ_WILDCARD_ALL,
RMQ_WILDCARD_SINGLE,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_NOACK, RQM_DEFAULT_NOACK,
RQM_DEFAULT_NO_ASSERT, RQM_DEFAULT_NO_ASSERT,
@@ -63,7 +66,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
protected readonly queue: string; protected readonly queue: string;
protected readonly noAck: boolean; protected readonly noAck: boolean;
protected readonly queueOptions: any; protected readonly queueOptions: any;
protected readonly wildcardHandlers = new Map<RegExp, MessageHandler>(); protected readonly wildcardHandlers = new Map<string, MessageHandler>();
protected pendingEventListeners: Array<{ protected pendingEventListeners: Array<{
event: keyof RmqEvents; event: keyof RmqEvents;
callback: RmqEvents[keyof RmqEvents]; callback: RmqEvents[keyof RmqEvents];
@@ -365,8 +368,8 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
if (this.wildcardHandlers.size === 0) { if (this.wildcardHandlers.size === 0) {
return null; return null;
} }
for (const [regex, handler] of this.wildcardHandlers) { for (const [wildcardPattern, handler] of this.wildcardHandlers) {
if (regex.test(pattern)) { if (this.matchRmqPattern(wildcardPattern, pattern)) {
return handler; return handler;
} }
} }
@@ -392,20 +395,46 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
const handlers = this.getHandlers(); const handlers = this.getHandlers();
handlers.forEach((handler, pattern) => { handlers.forEach((handler, pattern) => {
const regex = this.convertRoutingKeyToRegex(pattern); if (
if (regex) { pattern.includes(RMQ_WILDCARD_ALL) ||
this.wildcardHandlers.set(regex, handler); pattern.includes(RMQ_WILDCARD_SINGLE)
) {
this.wildcardHandlers.set(pattern, handler);
} }
}); });
} }
private convertRoutingKeyToRegex(routingKey: string): RegExp | undefined { private matchRmqPattern(pattern: string, routingKey: string): boolean {
if (!routingKey.includes('#') && !routingKey.includes('*')) { if (!routingKey) {
return; return pattern === RMQ_WILDCARD_ALL;
} }
let regexPattern = routingKey.replace(/\\/g, '\\\\').replace(/\./g, '\\.');
regexPattern = regexPattern.replace(/\*/g, '[^.]+'); const patternSegments = pattern.split(RMQ_SEPARATOR);
regexPattern = regexPattern.replace(/#/g, '.*'); const routingKeySegments = routingKey.split(RMQ_SEPARATOR);
return new RegExp(`^${regexPattern}$`);
const patternSegmentsLength = patternSegments.length;
const routingKeySegmentsLength = routingKeySegments.length;
const lastIndex = patternSegmentsLength - 1;
for (const [i, currentPattern] of patternSegments.entries()) {
const currentRoutingKey = routingKeySegments[i];
if (!currentRoutingKey && !currentPattern) {
continue;
}
if (!currentRoutingKey && currentPattern !== RMQ_WILDCARD_ALL) {
return false;
}
if (currentPattern === RMQ_WILDCARD_ALL) {
return i === lastIndex;
}
if (
currentPattern !== RMQ_WILDCARD_SINGLE &&
currentPattern !== currentRoutingKey
) {
return false;
}
}
return patternSegmentsLength === routingKeySegmentsLength;
} }
} }

View File

@@ -306,4 +306,128 @@ describe('ServerRMQ', () => {
expect(nack.calledWith(message, false, false)).not.to.be.true; expect(nack.calledWith(message, false, false)).not.to.be.true;
}); });
}); });
describe('matchRmqPattern', () => {
let matchRmqPattern: (pattern: string, routingKey: string) => boolean;
beforeEach(() => {
matchRmqPattern = untypedServer.matchRmqPattern.bind(untypedServer);
});
describe('exact matches', () => {
it('should match identical patterns', () => {
expect(matchRmqPattern('user.created', 'user.created')).to.be.true;
expect(matchRmqPattern('order.updated', 'order.updated')).to.be.true;
});
it('should not match different patterns', () => {
expect(matchRmqPattern('user.created', 'user.updated')).to.be.false;
expect(matchRmqPattern('order.created', 'user.created')).to.be.false;
});
it('should handle patterns with $ character (original issue)', () => {
expect(
matchRmqPattern('$internal.plugin.status', '$internal.plugin.status'),
).to.be.true;
expect(
matchRmqPattern(
'$internal.plugin.0.status',
'$internal.plugin.0.status',
),
).to.be.true;
expect(matchRmqPattern('user.$special.event', 'user.$special.event')).to
.be.true;
});
});
describe('single wildcard (*)', () => {
it('should match single segments', () => {
expect(matchRmqPattern('user.*', 'user.created')).to.be.true;
expect(matchRmqPattern('user.*', 'user.updated')).to.be.true;
expect(matchRmqPattern('*.created', 'user.created')).to.be.true;
expect(matchRmqPattern('*.created', 'order.created')).to.be.true;
});
it('should not match when segment counts differ', () => {
expect(matchRmqPattern('user.*', 'user.profile.created')).to.be.false;
expect(matchRmqPattern('*.created', 'user.profile.created')).to.be
.false;
});
it('should handle patterns with $ and *', () => {
expect(
matchRmqPattern(
'$internal.plugin.*.status',
'$internal.plugin.0.status',
),
).to.be.true;
expect(
matchRmqPattern(
'$internal.plugin.*.status',
'$internal.plugin.1.status',
),
).to.be.true;
expect(matchRmqPattern('$internal.*.status', '$internal.plugin.status'))
.to.be.true;
});
it('should handle multiple * wildcards', () => {
expect(matchRmqPattern('*.*.created', 'user.profile.created')).to.be
.true;
expect(matchRmqPattern('*.*.created', 'order.item.created')).to.be.true;
expect(matchRmqPattern('*.*.created', 'user.created')).to.be.false;
});
});
describe('catch all wildcard (#)', () => {
it('should match when # is at the end', () => {
expect(matchRmqPattern('user.#', 'user.created')).to.be.true;
expect(matchRmqPattern('user.#', 'user.profile.created')).to.be.true;
expect(matchRmqPattern('user.#', 'user.profile.details.updated')).to.be
.true;
});
it('should handle patterns with $ and #', () => {
expect(matchRmqPattern('$internal.#', '$internal.plugin.status')).to.be
.true;
expect(matchRmqPattern('$internal.#', '$internal.plugin.0.status')).to
.be.true;
expect(
matchRmqPattern('$internal.plugin.#', '$internal.plugin.0.status'),
).to.be.true;
});
it('should handle # at the beginning', () => {
expect(matchRmqPattern('#', 'user.created')).to.be.true;
expect(matchRmqPattern('#', 'user.profile.created')).to.be.true;
expect(matchRmqPattern('#', 'created')).to.be.true;
});
});
describe('edge cases', () => {
it('should handle empty routing key', () => {
expect(matchRmqPattern('user.created', '')).to.be.false;
expect(matchRmqPattern('*', '')).to.be.false;
expect(matchRmqPattern('#', '')).to.be.true;
});
it('should handle single segments', () => {
expect(matchRmqPattern('user', 'user')).to.be.true;
expect(matchRmqPattern('*', 'user')).to.be.true;
expect(matchRmqPattern('#', 'user')).to.be.true;
});
it('should handle complex $ patterns that previously failed', () => {
expect(
matchRmqPattern(
'$exchange.*.routing.#',
'$exchange.topic.routing.key.test',
),
).to.be.true;
expect(matchRmqPattern('$sys.#', '$sys.broker.clients')).to.be.true;
expect(matchRmqPattern('$SYS.#', '$SYS.broker.load.messages.received'))
.to.be.true;
});
});
});
}); });