From 823fbab75dfc302e4727f2ad4bd2dad179c15704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 30 Jan 2025 13:52:50 +0100 Subject: [PATCH] feat(microservices): add support for topic exchange (rabbitmq) --- .../e2e/topic-exchange-rmq.spec.ts | 38 +++++++ .../src/rmq/topic-exchange-rmq.controller.ts | 36 ++++++ packages/microservices/client/client-rmq.ts | 107 +++++++++++------- .../microservice-configuration.interface.ts | 7 ++ packages/microservices/server/server-rmq.ts | 90 +++++++++++++-- tools/gulp/tasks/move.ts | 6 +- 6 files changed, 227 insertions(+), 57 deletions(-) create mode 100644 integration/microservices/e2e/topic-exchange-rmq.spec.ts create mode 100644 integration/microservices/src/rmq/topic-exchange-rmq.controller.ts diff --git a/integration/microservices/e2e/topic-exchange-rmq.spec.ts b/integration/microservices/e2e/topic-exchange-rmq.spec.ts new file mode 100644 index 000000000..ac64ccd68 --- /dev/null +++ b/integration/microservices/e2e/topic-exchange-rmq.spec.ts @@ -0,0 +1,38 @@ +import { INestApplication } from '@nestjs/common'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; +import { Test } from '@nestjs/testing'; +import * as request from 'supertest'; +import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller'; + +describe('RabbitMQ transport (Topic Exchange)', () => { + let server: any; + let app: INestApplication; + + beforeEach(async () => { + const module = await Test.createTestingModule({ + controllers: [RMQTopicExchangeController], + }).compile(); + + app = module.createNestApplication(); + server = app.getHttpAdapter().getInstance(); + + app.connectMicroservice({ + transport: Transport.RMQ, + options: { + urls: [`amqp://0.0.0.0:5672`], + queue: 'test', + topicExchange: 'test', + }, + }); + await app.startAllMicroservices(); + await app.init(); + }); + + it(`should send message to wildcard topic exchange`, () => { + return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b'); + }); + + afterEach(async () => { + await app.close(); + }); +}); diff --git a/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts new file mode 100644 index 000000000..53474a10c --- /dev/null +++ b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts @@ -0,0 +1,36 @@ +import { Controller, Get } from '@nestjs/common'; +import { + ClientProxy, + ClientProxyFactory, + Ctx, + MessagePattern, + RmqContext, + Transport, +} from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +@Controller() +export class RMQTopicExchangeController { + client: ClientProxy; + + constructor() { + this.client = ClientProxyFactory.create({ + transport: Transport.RMQ, + options: { + urls: [`amqp://localhost:5672`], + queue: 'test', + topicExchange: 'test', + }, + }); + } + + @Get('topic-exchange') + async topicExchange() { + return lastValueFrom(this.client.send('wildcard.a.b', 1)); + } + + @MessagePattern('wildcard.*.*') + handleTopicExchange(@Ctx() ctx: RmqContext): string { + return ctx.getPattern(); + } +} diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index fa75d0fc3..15b7ae88f 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -1,7 +1,7 @@ import { Logger } from '@nestjs/common/services/logger.service'; import { loadPackage } from '@nestjs/common/utils/load-package.util'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; -import { isFunction } from '@nestjs/common/utils/shared.utils'; +import { isFunction, isString } from '@nestjs/common/utils/shared.utils'; import { EventEmitter } from 'events'; import { EmptyError, @@ -55,8 +55,8 @@ export class ClientRMQ extends ClientProxy { protected readonly logger = new Logger(ClientProxy.name); protected connection$: ReplaySubject; protected connectionPromise: Promise; - protected client: AmqpConnectionManager = null; - protected channel: ChannelWrapper = null; + protected client: AmqpConnectionManager | null = null; + protected channel: ChannelWrapper | null = null; protected pendingEventListeners: Array<{ event: keyof RmqEvents; callback: RmqEvents[keyof RmqEvents]; @@ -113,7 +113,7 @@ export class ClientRMQ extends ClientProxy { this.registerDisconnectListener(this.client); this.registerConnectListener(this.client); this.pendingEventListeners.forEach(({ event, callback }) => - this.client.on(event, callback), + this.client!.on(event, callback), ); this.pendingEventListeners = []; @@ -140,7 +140,7 @@ export class ClientRMQ extends ClientProxy { public createChannel(): Promise { return new Promise(resolve => { - this.channel = this.client.createChannel({ + this.channel = this.client!.createChannel({ json: false, setup: (channel: Channel) => this.setupChannel(channel, resolve), }); @@ -224,8 +224,8 @@ export class ClientRMQ extends ClientProxy { const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK); await channel.consume( this.replyQueue, - (msg: ConsumeMessage) => - this.responseEmitter.emit(msg.properties.correlationId, msg), + (msg: ConsumeMessage | null) => + this.responseEmitter.emit(msg!.properties.correlationId, msg), { noAck, }, @@ -359,23 +359,35 @@ export class ClientRMQ extends ClientProxy { delete serializedPacket.options; this.responseEmitter.on(correlationId, listener); - this.channel - .sendToQueue( - this.queue, - Buffer.from(JSON.stringify(serializedPacket)), - { - replyTo: this.replyQueue, - persistent: this.getOptionsProp( - this.options, - 'persistent', - RQM_DEFAULT_PERSISTENT, - ), - ...options, - headers: this.mergeHeaders(options?.headers), - correlationId, - }, - ) - .catch(err => callback({ err })); + + const content = Buffer.from(JSON.stringify(serializedPacket)); + const sendOptions = { + replyTo: this.replyQueue, + persistent: this.getOptionsProp( + this.options, + 'persistent', + RQM_DEFAULT_PERSISTENT, + ), + ...options, + headers: this.mergeHeaders(options?.headers), + correlationId, + }; + + if (this.options.topicExchange) { + const stringifiedPattern = isString(message.pattern) + ? message.pattern + : JSON.stringify(message.pattern); + this.channel!.publish( + this.options.topicExchange, + stringifiedPattern, + content, + sendOptions, + ).catch(err => callback({ err })); + } else { + this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err => + callback({ err }), + ); + } return () => this.responseEmitter.removeListener(correlationId, listener); } catch (err) { callback({ err }); @@ -390,22 +402,37 @@ export class ClientRMQ extends ClientProxy { const options = serializedPacket.options; delete serializedPacket.options; - return new Promise((resolve, reject) => - this.channel.sendToQueue( - this.queue, - Buffer.from(JSON.stringify(serializedPacket)), - { - persistent: this.getOptionsProp( - this.options, - 'persistent', - RQM_DEFAULT_PERSISTENT, - ), - ...options, - headers: this.mergeHeaders(options?.headers), - }, - (err: unknown) => (err ? reject(err as Error) : resolve()), - ), - ); + return new Promise((resolve, reject) => { + const content = Buffer.from(JSON.stringify(serializedPacket)); + const sendOptions = { + persistent: this.getOptionsProp( + this.options, + 'persistent', + RQM_DEFAULT_PERSISTENT, + ), + ...options, + headers: this.mergeHeaders(options?.headers), + }; + const errorCallback = (err: unknown) => + err ? reject(err as Error) : resolve(); + + return this.options.topicExchange + ? this.channel!.publish( + this.options.topicExchange, + isString(packet.pattern) + ? packet.pattern + : JSON.stringify(packet.pattern), + content, + sendOptions, + errorCallback, + ) + : this.channel!.sendToQueue( + this.queue, + content, + sendOptions, + errorCallback, + ); + }); } protected initializeSerializer(options: RmqOptions['options']) { diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index 0c371076b..079dcb4ef 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -231,6 +231,13 @@ export interface RmqOptions { persistent?: boolean; headers?: Record; noAssert?: boolean; + /** + * Set only if you want to use Topic Exchange for routing messages to queues. + * Enabling this will allow you to use wildcards (*, #) as message and event patterns. + * Topic exchange can have any arbitrary name, but it should be the same for the producer (client) and consumer (server). + * @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange + */ + topicExchange?: string; /** * Maximum number of connection attempts. * Applies only to the consumer configuration. diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 20fdea33e..068714d73 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -21,7 +21,7 @@ import { RmqContext } from '../ctx-host'; import { Transport } from '../enums'; import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events'; import { RmqUrl } from '../external/rmq-url.interface'; -import { RmqOptions } from '../interfaces'; +import { MessageHandler, RmqOptions } from '../interfaces'; import { IncomingRequest, OutgoingResponse, @@ -53,13 +53,14 @@ const INFINITE_CONNECTION_ATTEMPTS = -1; export class ServerRMQ extends Server { public readonly transportId = Transport.RMQ; - protected server: AmqpConnectionManager = null; - protected channel: ChannelWrapper = null; + protected server: AmqpConnectionManager | null = null; + protected channel: ChannelWrapper | null = null; protected connectionAttempts = 0; protected readonly urls: string[] | RmqUrl[]; protected readonly queue: string; protected readonly noAck: boolean; protected readonly queueOptions: any; + protected readonly wildcardHandlers = new Map(); protected pendingEventListeners: Array<{ event: keyof RmqEvents; callback: RmqEvents[keyof RmqEvents]; @@ -106,12 +107,12 @@ export class ServerRMQ extends Server { callback?: (err?: unknown, ...optionalParams: unknown[]) => void, ) { this.server = this.createClient(); - this.server.once(RmqEventsMap.CONNECT, () => { + this.server!.once(RmqEventsMap.CONNECT, () => { if (this.channel) { return; } this._status$.next(RmqStatus.CONNECTED); - this.channel = this.server.createChannel({ + this.channel = this.server!.createChannel({ json: false, setup: (channel: any) => this.setupChannel(channel, callback!), }); @@ -126,12 +127,12 @@ export class ServerRMQ extends Server { this.registerConnectListener(); this.registerDisconnectListener(); this.pendingEventListeners.forEach(({ event, callback }) => - this.server.on(event, callback), + this.server!.on(event, callback), ); this.pendingEventListeners = []; const connectFailedEvent = 'connectFailed'; - this.server.once(connectFailedEvent, (error: Record) => { + this.server!.once(connectFailedEvent, (error: Record) => { this._status$.next(RmqStatus.DISCONNECTED); this.logger.error(CONNECTION_FAILED_MESSAGE); @@ -162,13 +163,13 @@ export class ServerRMQ extends Server { } private registerConnectListener() { - this.server.on(RmqEventsMap.CONNECT, (err: any) => { + this.server!.on(RmqEventsMap.CONNECT, (err: any) => { this._status$.next(RmqStatus.CONNECTED); }); } private registerDisconnectListener() { - this.server.on(RmqEventsMap.DISCONNECT, (err: any) => { + this.server!.on(RmqEventsMap.DISCONNECT, (err: any) => { this._status$.next(RmqStatus.DISCONNECTED); this.logger.error(DISCONNECTED_RMQ_MESSAGE); this.logger.error(err); @@ -207,6 +208,21 @@ export class ServerRMQ extends Server { ); } + // When "Topic exchange" is used, we need to bind the queue to the exchange + // with all the routing keys used by the handlers + if (this.options.topicExchange) { + const routingKeys = Array.from(this.getHandlers().keys()); + await Promise.all( + routingKeys.map(routingKey => + channel.bindQueue(this.queue, this.options.topicExchange, routingKey), + ), + ); + + // "Topic exchange" supports wildcards, so we need to initialize wildcard handlers + // otherwise we would not be able to associate the incoming messages with the handlers + this.initializeWildcardHandlersIfExist(); + } + await channel.prefetch(prefetchCount, isGlobalPrefetchCount); channel.consume( this.queue, @@ -246,7 +262,7 @@ export class ServerRMQ extends Server { if (!handler) { if (!this.noAck) { this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`); - this.channel.nack(rmqContext.getMessage() as Message, false, false); + this.channel!.nack(rmqContext.getMessage() as Message, false, false); } const status = 'error'; const noHandlerPacket = { @@ -277,7 +293,7 @@ export class ServerRMQ extends Server { ): Promise { const handler = this.getHandlerByPattern(pattern); if (!handler && !this.noAck) { - this.channel.nack(context.getMessage() as Message, false, false); + this.channel!.nack(context.getMessage() as Message, false, false); return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`); } return super.handleEvent(pattern, packet, context); @@ -295,7 +311,8 @@ export class ServerRMQ extends Server { delete outgoingResponse.options; const buffer = Buffer.from(JSON.stringify(outgoingResponse)); - this.channel.sendToQueue(replyTo, buffer, { correlationId, ...options }); + const sendOptions = { correlationId, ...options }; + this.channel!.sendToQueue(replyTo, buffer, sendOptions); } public unwrap(): T { @@ -318,6 +335,31 @@ export class ServerRMQ extends Server { } } + public getHandlerByPattern(pattern: string): MessageHandler | null { + if (!this.options.topicExchange) { + // When "Topic exchange" is not used, wildcards are not supported + // so we can fallback to the default behavior + return super.getHandlerByPattern(pattern); + } + + // Search for non-wildcard handler first + const handler = super.getHandlerByPattern(pattern); + if (handler) { + return handler; + } + + // Search for wildcard handler + if (this.wildcardHandlers.size === 0) { + return null; + } + for (const [regex, handler] of this.wildcardHandlers) { + if (regex.test(pattern)) { + return handler; + } + } + return null; + } + protected initializeSerializer(options: RmqOptions['options']) { this.serializer = options?.serializer ?? new RmqRecordSerializer(); } @@ -329,4 +371,28 @@ export class ServerRMQ extends Server { return content.toString(); } } + + private initializeWildcardHandlersIfExist() { + if (this.wildcardHandlers.size !== 0) { + return; + } + const handlers = this.getHandlers(); + + handlers.forEach((handler, pattern) => { + const regex = this.convertRoutingKeyToRegex(pattern); + if (regex) { + this.wildcardHandlers.set(regex, handler); + } + }); + } + + private convertRoutingKeyToRegex(routingKey: string): RegExp | undefined { + if (!routingKey.includes('#') && !routingKey.includes('*')) { + return; + } + let regexPattern = routingKey.replace(/\./g, '\\.'); + regexPattern = regexPattern.replace(/\*/g, '[^.]+'); + regexPattern = regexPattern.replace(/#/g, '.*'); + return new RegExp(`^${regexPattern}$`); + } } diff --git a/tools/gulp/tasks/move.ts b/tools/gulp/tasks/move.ts index 0991e1e6f..666901c39 100644 --- a/tools/gulp/tasks/move.ts +++ b/tools/gulp/tasks/move.ts @@ -3,11 +3,7 @@ import { join } from 'path'; import { samplePath } from '../config'; import { containsPackageJson, getDirs } from '../util/task-helpers'; -const distFiles = src([ - 'packages/**/*', - '!packages/**/*.ts', - 'packages/**/*.d.ts', -]); +const distFiles = src(['packages/**/*.js', 'packages/**/*.d.ts']); /** * Moves the compiled nest files into "node_module" folder.