mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
fix(microservices): rmq generated queue name in server
Generated queue name use in the server channel setup process
This commit is contained in:
@@ -191,8 +191,16 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
|
||||
this.queueOptions.noAssert ??
|
||||
RQM_DEFAULT_NO_ASSERT;
|
||||
|
||||
if (!noAssert) {
|
||||
await channel.assertQueue(this.queue, this.queueOptions);
|
||||
let createdQueue: string;
|
||||
|
||||
if (this.queue === RQM_DEFAULT_QUEUE || !noAssert) {
|
||||
const { queue } = await channel.assertQueue(
|
||||
this.queue,
|
||||
this.queueOptions,
|
||||
);
|
||||
createdQueue = queue;
|
||||
} else {
|
||||
createdQueue = this.queue;
|
||||
}
|
||||
|
||||
const isGlobalPrefetchCount = this.getOptionsProp(
|
||||
@@ -225,7 +233,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
|
||||
|
||||
if (this.options.routingKey || this.options.exchangeType === 'fanout') {
|
||||
await channel.bindQueue(
|
||||
this.queue,
|
||||
createdQueue,
|
||||
exchange,
|
||||
this.options.exchangeType === 'fanout' ? '' : this.options.routingKey,
|
||||
);
|
||||
@@ -235,7 +243,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
|
||||
const routingKeys = Array.from(this.getHandlers().keys());
|
||||
await Promise.all(
|
||||
routingKeys.map(routingKey =>
|
||||
channel.bindQueue(this.queue, exchange, routingKey),
|
||||
channel.bindQueue(createdQueue, exchange, routingKey),
|
||||
),
|
||||
);
|
||||
|
||||
@@ -247,7 +255,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
|
||||
|
||||
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
|
||||
channel.consume(
|
||||
this.queue,
|
||||
createdQueue,
|
||||
(msg: Record<string, any> | null) => this.handleMessage(msg!, channel),
|
||||
{
|
||||
noAck: this.noAck,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { assert, expect } from 'chai';
|
||||
import * as sinon from 'sinon';
|
||||
import { NO_MESSAGE_HANDLER } from '../../constants';
|
||||
import { NO_MESSAGE_HANDLER, RQM_DEFAULT_QUEUE } from '../../constants';
|
||||
import { RmqContext } from '../../ctx-host';
|
||||
import { ServerRMQ } from '../../server/server-rmq';
|
||||
import { objectToMap } from './utils/object-to-map';
|
||||
@@ -195,7 +195,7 @@ describe('ServerRMQ', () => {
|
||||
};
|
||||
|
||||
channel = {
|
||||
assertQueue: sinon.spy(() => ({})),
|
||||
assertQueue: sinon.spy(() => ({ queue })),
|
||||
prefetch: sinon.spy(),
|
||||
consume: sinon.spy(),
|
||||
assertExchange: sinon.spy(() => ({})),
|
||||
@@ -208,6 +208,13 @@ describe('ServerRMQ', () => {
|
||||
await server.setupChannel(channel, () => null);
|
||||
expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true;
|
||||
});
|
||||
it('should call "assertQueue" with queue and queue options when queue is default queue', async () => {
|
||||
server['queue' as any] = RQM_DEFAULT_QUEUE;
|
||||
|
||||
await server.setupChannel(channel, () => null);
|
||||
expect(channel.assertQueue.calledWith(RQM_DEFAULT_QUEUE, queueOptions)).to
|
||||
.be.true;
|
||||
});
|
||||
it('should not call "assertQueue" when noAssert is true', async () => {
|
||||
server['options' as any] = {
|
||||
...(server as any)['options'],
|
||||
@@ -218,13 +225,16 @@ describe('ServerRMQ', () => {
|
||||
expect(channel.assertQueue.called).not.to.be.true;
|
||||
});
|
||||
it('should call "bindQueue" with exchangeType is fanout', async () => {
|
||||
const namedQueue = 'exclusive-queue-name';
|
||||
channel.assertQueue = sinon.spy(() => ({ queue: namedQueue }));
|
||||
server['queue' as any] = RQM_DEFAULT_QUEUE;
|
||||
server['options' as any] = {
|
||||
...(server as any)['options'],
|
||||
exchangeType: 'fanout',
|
||||
exchange: exchange,
|
||||
};
|
||||
await server.setupChannel(channel, () => null);
|
||||
expect(channel.bindQueue.calledWith(queue, exchange, '')).to.be.true;
|
||||
expect(channel.bindQueue.calledWith(namedQueue, exchange, '')).to.be.true;
|
||||
});
|
||||
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
|
||||
await server.setupChannel(channel, () => null);
|
||||
|
||||
Reference in New Issue
Block a user