fix: rabbitmq bindings and auto-generated queues

This commit is contained in:
Khan / 이창민
2024-11-11 15:31:19 +09:00
parent bc4667c15a
commit e231f9887e
4 changed files with 24 additions and 1 deletions

View File

@@ -192,6 +192,15 @@ export class ClientRMQ extends ClientProxy {
if (!this.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
if (this.options.exchange && this.options.routingKey) {
await channel.bindQueue(
this.queue,
this.options.exchange,
this.options.routingKey,
);
}
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
await this.consumeChannel(channel);
resolve();

View File

@@ -33,7 +33,7 @@ export const PARAM_ARGS_METADATA = ROUTE_ARGS_METADATA;
export const REQUEST_PATTERN_METADATA = 'microservices:request_pattern';
export const REPLY_PATTERN_METADATA = 'microservices:reply_pattern';
export const RQM_DEFAULT_QUEUE = 'default';
export const RQM_DEFAULT_QUEUE = '';
export const RQM_DEFAULT_PREFETCH_COUNT = 0;
export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;
export const RQM_DEFAULT_QUEUE_OPTIONS = {};

View File

@@ -209,6 +209,8 @@ export interface RmqOptions {
isGlobalPrefetchCount?: boolean;
queueOptions?: AmqplibQueueOptions;
socketOptions?: AmqpConnectionManagerSocketOptions;
exchange?: string;
routingKey?: string;
noAck?: boolean;
consumerTag?: string;
serializer?: Serializer;

View File

@@ -153,6 +153,18 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
if (!this.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
if (this.options.exchange && this.options.routingKey) {
await channel.assertExchange(this.options.exchange, 'topic', {
durable: true,
});
await channel.bindQueue(
this.queue,
this.options.exchange,
this.options.routingKey,
);
}
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
channel.consume(
this.queue,