refactor: introduce wildcards attribute, use existing exchange

This commit is contained in:
Kamil Myśliwiec
2025-01-31 08:50:10 +01:00
parent 23e76fd296
commit d2948d9522
5 changed files with 136 additions and 41 deletions

View File

@@ -4,7 +4,7 @@ import { Test } from '@nestjs/testing';
import * as request from 'supertest';
import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller';
describe('RabbitMQ transport (Topic Exchange)', () => {
describe('RabbitMQ transport (Topic Exchange - wildcards)', () => {
let server: any;
let app: INestApplication;
@@ -21,7 +21,7 @@ describe('RabbitMQ transport (Topic Exchange)', () => {
options: {
urls: [`amqp://0.0.0.0:5672`],
queue: 'test2',
topicExchange: 'test',
wildcards: true,
},
});
await app.startAllMicroservices();
@@ -30,7 +30,7 @@ describe('RabbitMQ transport (Topic Exchange)', () => {
it(`should send message to wildcard topic exchange`, () => {
return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b');
}).timeout(10000);
});
afterEach(async () => {
await app.close();

View File

@@ -19,7 +19,7 @@ export class RMQTopicExchangeController {
options: {
urls: [`amqp://localhost:5672`],
queue: 'test2',
topicExchange: 'test',
wildcards: true,
},
});
}

View File

@@ -216,6 +216,22 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
);
}
if (this.options.wildcards) {
const exchange = this.getOptionsProp(
this.options,
'exchange',
this.options.queue,
);
const exchangeType = this.getOptionsProp(
this.options,
'exchangeType',
'topic',
);
await channel.assertExchange(exchange, exchangeType, {
durable: true,
});
}
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
await this.consumeChannel(channel);
resolve();
@@ -374,12 +390,21 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
correlationId,
};
if (this.options.topicExchange) {
if (this.options.wildcards) {
const stringifiedPattern = isString(message.pattern)
? message.pattern
: JSON.stringify(message.pattern);
// The exchange is the same as the queue when wildcards are enabled
// and the exchange is not explicitly set
const exchange = this.getOptionsProp(
this.options,
'exchange',
this.queue,
);
this.channel!.publish(
this.options.topicExchange,
exchange,
stringifiedPattern,
content,
sendOptions,
@@ -417,9 +442,11 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const errorCallback = (err: unknown) =>
err ? reject(err as Error) : resolve();
return this.options.topicExchange
return this.options.wildcards
? this.channel!.publish(
this.options.topicExchange,
// The exchange is the same as the queue when wildcards are enabled
// and the exchange is not explicitly set
this.getOptionsProp(this.options, 'exchange', this.queue),
isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern),

View File

@@ -215,29 +215,89 @@ export interface NatsOptions {
export interface RmqOptions {
transport?: Transport.RMQ;
options?: {
/**
* An array of connection URLs to try in order.
*/
urls?: string[] | RmqUrl[];
/**
* The name of the queue.
*/
queue?: string;
/**
* A prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement;
* once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.
*/
prefetchCount?: number;
/**
* Sets the per-channel behavior for prefetching messages.
*/
isGlobalPrefetchCount?: boolean;
/**
* Amqplib queue options.
* @see https://amqp-node.github.io/amqplib/channel_api.html#channel_assertQueue
*/
queueOptions?: AmqplibQueueOptions;
/**
* AMQP Connection Manager socket options.
*/
socketOptions?: AmqpConnectionManagerSocketOptions;
exchange?: string;
routingKey?: string;
/**
* Iif true, the broker wont expect an acknowledgement of messages delivered to this consumer; i.e., it will dequeue messages as soon as theyve been sent down the wire.
* @default false
*/
noAck?: boolean;
/**
* A name which the server will use to distinguish message deliveries for the consumer; mustnt be already in use on the channel. Its usually easier to omit this, in which case the server will create a random name and supply it in the reply.
*/
consumerTag?: string;
/**
* A serializer for the message payload.
*/
serializer?: Serializer;
/**
* A deserializer for the message payload.
*/
deserializer?: Deserializer;
/**
* A reply queue for the producer.
* @default 'amq.rabbitmq.reply-to'
*/
replyQueue?: string;
/**
* If truthy, the message will survive broker restarts provided its in a queue that also survives restarts.
*/
persistent?: boolean;
/**
* Additional headers to be sent with every message.
* Applies only to the producer configuration.
*/
headers?: Record<string, string>;
/**
* When false, a queue will not be asserted before consuming.
* @default false
*/
noAssert?: boolean;
/**
* Set only if you want to use Topic Exchange for routing messages to queues.
* Enabling this will allow you to use wildcards (*, #) as message and event patterns.
* Topic exchange can have any arbitrary name, but it should be the same for the producer (client) and consumer (server).
* @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange
* Name for the exchange. Defaults to the queue name when "wildcards" is set to true.
* @default ''
*/
topicExchange?: string;
exchange?: string;
/**
* Type of the exchange
* @default 'topic'
*/
exchangeType?: 'direct' | 'fanout' | 'topic' | 'headers';
/**
* Additional routing key for the topic exchange.
*/
routingKey?: string;
/**
* Set to true only if you want to use Topic Exchange for routing messages to queues.
* Enabling this will allow you to use wildcards (*, #) as message and event patterns.
* @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange
* @default false
*/
wildcards?: boolean;
/**
* Maximum number of connection attempts.
* Applies only to the consumer configuration.

View File

@@ -39,10 +39,12 @@ import { Server } from './server';
// import('amqp-connection-manager').AmqpConnectionManager;
// type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper;
// type Message = import('amqplib').Message;
// type Channel = import('amqplib').Channel | import('amqplib').ConfirmChannel;
type AmqpConnectionManager = any;
type ChannelWrapper = any;
type Message = any;
type Channel = any;
let rmqPackage = {} as any; // as typeof import('amqp-connection-manager');
@@ -115,7 +117,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
this._status$.next(RmqStatus.CONNECTED);
this.channel = this.server!.createChannel({
json: false,
setup: (channel: any) => this.setupChannel(channel, callback!),
setup: (channel: Channel) => this.setupChannel(channel, callback!),
});
});
@@ -177,7 +179,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
});
}
public async setupChannel(channel: any, callback: Function) {
public async setupChannel(channel: Channel, callback: Function) {
const noAssert =
this.getOptionsProp(this.options, 'noAssert') ??
this.queueOptions.noAssert ??
@@ -198,36 +200,44 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
RQM_DEFAULT_PREFETCH_COUNT,
);
if (this.options.exchange && this.options.routingKey) {
await channel.assertExchange(this.options.exchange, 'topic', {
if (this.options.exchange || this.options.wildcards) {
// Use queue name as exchange name if exchange is not provided and "wildcards" is set to true
const exchange = this.getOptionsProp(
this.options,
'exchange',
this.options.queue,
);
const exchangeType = this.getOptionsProp(
this.options,
'exchangeType',
'topic',
);
await channel.assertExchange(exchange, exchangeType, {
durable: true,
});
await channel.bindQueue(
this.queue,
this.options.exchange,
this.options.routingKey,
);
if (this.options.routingKey) {
await channel.bindQueue(this.queue, exchange, this.options.routingKey);
}
// When "Topic exchange" is used, we need to bind the queue to the exchange
// with all the routing keys used by the handlers
if (this.options.topicExchange) {
if (this.options.wildcards) {
const routingKeys = Array.from(this.getHandlers().keys());
await Promise.all(
routingKeys.map(routingKey =>
channel.bindQueue(this.queue, this.options.topicExchange, routingKey),
channel.bindQueue(this.queue, exchange, routingKey),
),
);
// "Topic exchange" supports wildcards, so we need to initialize wildcard handlers
// When "wildcards" is set to true, we need to initialize wildcard handlers
// otherwise we would not be able to associate the incoming messages with the handlers
this.initializeWildcardHandlersIfExist();
}
}
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
channel.consume(
this.queue,
(msg: Record<string, any>) => this.handleMessage(msg, channel),
(msg: Record<string, any> | null) => this.handleMessage(msg!, channel),
{
noAck: this.noAck,
consumerTag: this.getOptionsProp(
@@ -337,9 +347,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
}
public getHandlerByPattern(pattern: string): MessageHandler | null {
if (!this.options.topicExchange) {
// When "Topic exchange" is not used, wildcards are not supported
// so we can fallback to the default behavior
if (!this.options.wildcards) {
return super.getHandlerByPattern(pattern);
}