feat(microservices): add support for topic exchange (rabbitmq)

This commit is contained in:
Kamil Myśliwiec
2025-01-30 13:52:50 +01:00
parent 08fce4ac5f
commit 823fbab75d
6 changed files with 227 additions and 57 deletions

View File

@@ -0,0 +1,38 @@
import { INestApplication } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import * as request from 'supertest';
import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller';
describe('RabbitMQ transport (Topic Exchange)', () => {
let server: any;
let app: INestApplication;
beforeEach(async () => {
const module = await Test.createTestingModule({
controllers: [RMQTopicExchangeController],
}).compile();
app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: [`amqp://0.0.0.0:5672`],
queue: 'test',
topicExchange: 'test',
},
});
await app.startAllMicroservices();
await app.init();
});
it(`should send message to wildcard topic exchange`, () => {
return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b');
});
afterEach(async () => {
await app.close();
});
});

View File

@@ -0,0 +1,36 @@
import { Controller, Get } from '@nestjs/common';
import {
ClientProxy,
ClientProxyFactory,
Ctx,
MessagePattern,
RmqContext,
Transport,
} from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';
@Controller()
export class RMQTopicExchangeController {
client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'test',
topicExchange: 'test',
},
});
}
@Get('topic-exchange')
async topicExchange() {
return lastValueFrom(this.client.send<string>('wildcard.a.b', 1));
}
@MessagePattern('wildcard.*.*')
handleTopicExchange(@Ctx() ctx: RmqContext): string {
return ctx.getPattern();
}
}

View File

@@ -1,7 +1,7 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { isFunction, isString } from '@nestjs/common/utils/shared.utils';
import { EventEmitter } from 'events';
import {
EmptyError,
@@ -55,8 +55,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
protected readonly logger = new Logger(ClientProxy.name);
protected connection$: ReplaySubject<any>;
protected connectionPromise: Promise<void>;
protected client: AmqpConnectionManager = null;
protected channel: ChannelWrapper = null;
protected client: AmqpConnectionManager | null = null;
protected channel: ChannelWrapper | null = null;
protected pendingEventListeners: Array<{
event: keyof RmqEvents;
callback: RmqEvents[keyof RmqEvents];
@@ -113,7 +113,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
this.registerDisconnectListener(this.client);
this.registerConnectListener(this.client);
this.pendingEventListeners.forEach(({ event, callback }) =>
this.client.on(event, callback),
this.client!.on(event, callback),
);
this.pendingEventListeners = [];
@@ -140,7 +140,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
public createChannel(): Promise<void> {
return new Promise(resolve => {
this.channel = this.client.createChannel({
this.channel = this.client!.createChannel({
json: false,
setup: (channel: Channel) => this.setupChannel(channel, resolve),
});
@@ -224,8 +224,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
await channel.consume(
this.replyQueue,
(msg: ConsumeMessage) =>
this.responseEmitter.emit(msg.properties.correlationId, msg),
(msg: ConsumeMessage | null) =>
this.responseEmitter.emit(msg!.properties.correlationId, msg),
{
noAck,
},
@@ -359,11 +359,9 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
delete serializedPacket.options;
this.responseEmitter.on(correlationId, listener);
this.channel
.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(serializedPacket)),
{
const content = Buffer.from(JSON.stringify(serializedPacket));
const sendOptions = {
replyTo: this.replyQueue,
persistent: this.getOptionsProp(
this.options,
@@ -373,9 +371,23 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
...options,
headers: this.mergeHeaders(options?.headers),
correlationId,
},
)
.catch(err => callback({ err }));
};
if (this.options.topicExchange) {
const stringifiedPattern = isString(message.pattern)
? message.pattern
: JSON.stringify(message.pattern);
this.channel!.publish(
this.options.topicExchange,
stringifiedPattern,
content,
sendOptions,
).catch(err => callback({ err }));
} else {
this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err =>
callback({ err }),
);
}
return () => this.responseEmitter.removeListener(correlationId, listener);
} catch (err) {
callback({ err });
@@ -390,11 +402,9 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const options = serializedPacket.options;
delete serializedPacket.options;
return new Promise<void>((resolve, reject) =>
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(serializedPacket)),
{
return new Promise<void>((resolve, reject) => {
const content = Buffer.from(JSON.stringify(serializedPacket));
const sendOptions = {
persistent: this.getOptionsProp(
this.options,
'persistent',
@@ -402,10 +412,27 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
),
...options,
headers: this.mergeHeaders(options?.headers),
},
(err: unknown) => (err ? reject(err as Error) : resolve()),
),
};
const errorCallback = (err: unknown) =>
err ? reject(err as Error) : resolve();
return this.options.topicExchange
? this.channel!.publish(
this.options.topicExchange,
isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern),
content,
sendOptions,
errorCallback,
)
: this.channel!.sendToQueue(
this.queue,
content,
sendOptions,
errorCallback,
);
});
}
protected initializeSerializer(options: RmqOptions['options']) {

View File

@@ -231,6 +231,13 @@ export interface RmqOptions {
persistent?: boolean;
headers?: Record<string, string>;
noAssert?: boolean;
/**
* Set only if you want to use Topic Exchange for routing messages to queues.
* Enabling this will allow you to use wildcards (*, #) as message and event patterns.
* Topic exchange can have any arbitrary name, but it should be the same for the producer (client) and consumer (server).
* @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange
*/
topicExchange?: string;
/**
* Maximum number of connection attempts.
* Applies only to the consumer configuration.

View File

@@ -21,7 +21,7 @@ import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
import { RmqUrl } from '../external/rmq-url.interface';
import { RmqOptions } from '../interfaces';
import { MessageHandler, RmqOptions } from '../interfaces';
import {
IncomingRequest,
OutgoingResponse,
@@ -53,13 +53,14 @@ const INFINITE_CONNECTION_ATTEMPTS = -1;
export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
public readonly transportId = Transport.RMQ;
protected server: AmqpConnectionManager = null;
protected channel: ChannelWrapper = null;
protected server: AmqpConnectionManager | null = null;
protected channel: ChannelWrapper | null = null;
protected connectionAttempts = 0;
protected readonly urls: string[] | RmqUrl[];
protected readonly queue: string;
protected readonly noAck: boolean;
protected readonly queueOptions: any;
protected readonly wildcardHandlers = new Map<RegExp, MessageHandler>();
protected pendingEventListeners: Array<{
event: keyof RmqEvents;
callback: RmqEvents[keyof RmqEvents];
@@ -106,12 +107,12 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
callback?: (err?: unknown, ...optionalParams: unknown[]) => void,
) {
this.server = this.createClient();
this.server.once(RmqEventsMap.CONNECT, () => {
this.server!.once(RmqEventsMap.CONNECT, () => {
if (this.channel) {
return;
}
this._status$.next(RmqStatus.CONNECTED);
this.channel = this.server.createChannel({
this.channel = this.server!.createChannel({
json: false,
setup: (channel: any) => this.setupChannel(channel, callback!),
});
@@ -126,12 +127,12 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
this.registerConnectListener();
this.registerDisconnectListener();
this.pendingEventListeners.forEach(({ event, callback }) =>
this.server.on(event, callback),
this.server!.on(event, callback),
);
this.pendingEventListeners = [];
const connectFailedEvent = 'connectFailed';
this.server.once(connectFailedEvent, (error: Record<string, unknown>) => {
this.server!.once(connectFailedEvent, (error: Record<string, unknown>) => {
this._status$.next(RmqStatus.DISCONNECTED);
this.logger.error(CONNECTION_FAILED_MESSAGE);
@@ -162,13 +163,13 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
}
private registerConnectListener() {
this.server.on(RmqEventsMap.CONNECT, (err: any) => {
this.server!.on(RmqEventsMap.CONNECT, (err: any) => {
this._status$.next(RmqStatus.CONNECTED);
});
}
private registerDisconnectListener() {
this.server.on(RmqEventsMap.DISCONNECT, (err: any) => {
this.server!.on(RmqEventsMap.DISCONNECT, (err: any) => {
this._status$.next(RmqStatus.DISCONNECTED);
this.logger.error(DISCONNECTED_RMQ_MESSAGE);
this.logger.error(err);
@@ -207,6 +208,21 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
);
}
// When "Topic exchange" is used, we need to bind the queue to the exchange
// with all the routing keys used by the handlers
if (this.options.topicExchange) {
const routingKeys = Array.from(this.getHandlers().keys());
await Promise.all(
routingKeys.map(routingKey =>
channel.bindQueue(this.queue, this.options.topicExchange, routingKey),
),
);
// "Topic exchange" supports wildcards, so we need to initialize wildcard handlers
// otherwise we would not be able to associate the incoming messages with the handlers
this.initializeWildcardHandlersIfExist();
}
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
channel.consume(
this.queue,
@@ -246,7 +262,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
if (!handler) {
if (!this.noAck) {
this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`);
this.channel.nack(rmqContext.getMessage() as Message, false, false);
this.channel!.nack(rmqContext.getMessage() as Message, false, false);
}
const status = 'error';
const noHandlerPacket = {
@@ -277,7 +293,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
): Promise<any> {
const handler = this.getHandlerByPattern(pattern);
if (!handler && !this.noAck) {
this.channel.nack(context.getMessage() as Message, false, false);
this.channel!.nack(context.getMessage() as Message, false, false);
return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`);
}
return super.handleEvent(pattern, packet, context);
@@ -295,7 +311,8 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
delete outgoingResponse.options;
const buffer = Buffer.from(JSON.stringify(outgoingResponse));
this.channel.sendToQueue(replyTo, buffer, { correlationId, ...options });
const sendOptions = { correlationId, ...options };
this.channel!.sendToQueue(replyTo, buffer, sendOptions);
}
public unwrap<T>(): T {
@@ -318,6 +335,31 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
}
}
public getHandlerByPattern(pattern: string): MessageHandler | null {
if (!this.options.topicExchange) {
// When "Topic exchange" is not used, wildcards are not supported
// so we can fallback to the default behavior
return super.getHandlerByPattern(pattern);
}
// Search for non-wildcard handler first
const handler = super.getHandlerByPattern(pattern);
if (handler) {
return handler;
}
// Search for wildcard handler
if (this.wildcardHandlers.size === 0) {
return null;
}
for (const [regex, handler] of this.wildcardHandlers) {
if (regex.test(pattern)) {
return handler;
}
}
return null;
}
protected initializeSerializer(options: RmqOptions['options']) {
this.serializer = options?.serializer ?? new RmqRecordSerializer();
}
@@ -329,4 +371,28 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
return content.toString();
}
}
private initializeWildcardHandlersIfExist() {
if (this.wildcardHandlers.size !== 0) {
return;
}
const handlers = this.getHandlers();
handlers.forEach((handler, pattern) => {
const regex = this.convertRoutingKeyToRegex(pattern);
if (regex) {
this.wildcardHandlers.set(regex, handler);
}
});
}
private convertRoutingKeyToRegex(routingKey: string): RegExp | undefined {
if (!routingKey.includes('#') && !routingKey.includes('*')) {
return;
}
let regexPattern = routingKey.replace(/\./g, '\\.');
regexPattern = regexPattern.replace(/\*/g, '[^.]+');
regexPattern = regexPattern.replace(/#/g, '.*');
return new RegExp(`^${regexPattern}$`);
}
}

View File

@@ -3,11 +3,7 @@ import { join } from 'path';
import { samplePath } from '../config';
import { containsPackageJson, getDirs } from '../util/task-helpers';
const distFiles = src([
'packages/**/*',
'!packages/**/*.ts',
'packages/**/*.d.ts',
]);
const distFiles = src(['packages/**/*.js', 'packages/**/*.d.ts']);
/**
* Moves the compiled nest files into "node_module" folder.