diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 8386efae9..d58a4cd2d 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -12,14 +12,14 @@ import { import { first, map, retryWhen, scan, skip, switchMap } from 'rxjs/operators'; import { DISCONNECTED_RMQ_MESSAGE, - RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, - RQM_DEFAULT_NO_ASSERT, - RQM_DEFAULT_NOACK, - RQM_DEFAULT_PERSISTENT, - RQM_DEFAULT_PREFETCH_COUNT, - RQM_DEFAULT_QUEUE, - RQM_DEFAULT_QUEUE_OPTIONS, - RQM_DEFAULT_URL, + RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, + RMQ_DEFAULT_NO_ASSERT, + RMQ_DEFAULT_NOACK, + RMQ_DEFAULT_PERSISTENT, + RMQ_DEFAULT_PREFETCH_COUNT, + RMQ_DEFAULT_QUEUE, + RMQ_DEFAULT_QUEUE_OPTIONS, + RMQ_DEFAULT_URL, } from '../constants.js'; import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events.js'; import { ReadPacket, RmqOptions, WritePacket } from '../interfaces/index.js'; @@ -75,11 +75,11 @@ export class ClientRMQ extends ClientProxy { constructor(protected readonly options: Required['options']) { super(); - this.queue = this.getOptionsProp(this.options, 'queue', RQM_DEFAULT_QUEUE); + this.queue = this.getOptionsProp(this.options, 'queue', RMQ_DEFAULT_QUEUE); this.queueOptions = this.getOptionsProp( this.options, 'queueOptions', - RQM_DEFAULT_QUEUE_OPTIONS, + RMQ_DEFAULT_QUEUE_OPTIONS, ); this.replyQueue = this.getOptionsProp( this.options, @@ -89,7 +89,7 @@ export class ClientRMQ extends ClientProxy { this.noAssert = this.getOptionsProp(this.options, 'noAssert') ?? this.queueOptions.noAssert ?? - RQM_DEFAULT_NO_ASSERT; + RMQ_DEFAULT_NO_ASSERT; loadPackageSync('amqplib', ClientRMQ.name, () => createRequire(import.meta.url)('amqplib'), @@ -158,7 +158,7 @@ export class ClientRMQ extends ClientProxy { public createClient(): AmqpConnectionManager { const socketOptions = this.getOptionsProp(this.options, 'socketOptions'); - const urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL]; + const urls = this.getOptionsProp(this.options, 'urls') || [RMQ_DEFAULT_URL]; return rmqPackage.connect(urls, socketOptions); } @@ -207,10 +207,10 @@ export class ClientRMQ extends ClientProxy { public async setupChannel(channel: Channel, resolve: Function) { const prefetchCount = this.getOptionsProp(this.options, 'prefetchCount') || - RQM_DEFAULT_PREFETCH_COUNT; + RMQ_DEFAULT_PREFETCH_COUNT; const isGlobalPrefetchCount = this.getOptionsProp(this.options, 'isGlobalPrefetchCount') || - RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; + RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; if (!this.options.wildcards && this.options.exchangeType !== 'fanout') { if (!this.noAssert) { @@ -247,7 +247,7 @@ export class ClientRMQ extends ClientProxy { } public async consumeChannel(channel: Channel) { - const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK); + const noAck = this.getOptionsProp(this.options, 'noAck', RMQ_DEFAULT_NOACK); await channel.consume( this.replyQueue, (msg: ConsumeMessage | null) => @@ -392,7 +392,7 @@ export class ClientRMQ extends ClientProxy { persistent: this.getOptionsProp( this.options, 'persistent', - RQM_DEFAULT_PERSISTENT, + RMQ_DEFAULT_PERSISTENT, ), ...options, headers: this.mergeHeaders(options?.headers), @@ -443,7 +443,7 @@ export class ClientRMQ extends ClientProxy { persistent: this.getOptionsProp( this.options, 'persistent', - RQM_DEFAULT_PERSISTENT, + RMQ_DEFAULT_PERSISTENT, ), ...options, headers: this.mergeHeaders(options?.headers), diff --git a/packages/microservices/constants.ts b/packages/microservices/constants.ts index 847a37134..d40d6a7e4 100644 --- a/packages/microservices/constants.ts +++ b/packages/microservices/constants.ts @@ -7,20 +7,20 @@ export const REDIS_DEFAULT_HOST = 'localhost'; export const NATS_DEFAULT_URL = 'nats://localhost:4222'; export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883'; export const GRPC_DEFAULT_URL = 'localhost:5000'; -export const RQM_DEFAULT_URL = 'amqp://localhost'; +export const RMQ_DEFAULT_URL = 'amqp://localhost'; export const KAFKA_DEFAULT_BROKER = 'localhost:9092'; export const KAFKA_DEFAULT_CLIENT = 'nestjs-consumer'; export const KAFKA_DEFAULT_GROUP = 'nestjs-group'; export const MQTT_SEPARATOR = '/'; export const MQTT_WILDCARD_SINGLE = '+'; export const MQTT_WILDCARD_ALL = '#'; -export const RQM_DEFAULT_QUEUE = ''; -export const RQM_DEFAULT_PREFETCH_COUNT = 0; -export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false; -export const RQM_DEFAULT_QUEUE_OPTIONS = {}; -export const RQM_DEFAULT_NOACK = true; -export const RQM_DEFAULT_PERSISTENT = false; -export const RQM_DEFAULT_NO_ASSERT = false; +export const RMQ_DEFAULT_QUEUE = ''; +export const RMQ_DEFAULT_PREFETCH_COUNT = 0; +export const RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false; +export const RMQ_DEFAULT_QUEUE_OPTIONS = {}; +export const RMQ_DEFAULT_NOACK = true; +export const RMQ_DEFAULT_PERSISTENT = false; +export const RMQ_DEFAULT_NO_ASSERT = false; export const RMQ_SEPARATOR = '.'; export const RMQ_WILDCARD_SINGLE = '*'; export const RMQ_WILDCARD_ALL = '#'; @@ -40,12 +40,12 @@ export const PARAM_ARGS_METADATA = ROUTE_ARGS_METADATA; export const REQUEST_PATTERN_METADATA = 'microservices:request_pattern'; export const REPLY_PATTERN_METADATA = 'microservices:reply_pattern'; -export const RQM_NO_EVENT_HANDLER = ( +export const RMQ_NO_EVENT_HANDLER = ( text: TemplateStringsArray, pattern: string, ) => `An unsupported event was received. It has been negative acknowledged, so it will not be re-delivered. Pattern: ${pattern}`; -export const RQM_NO_MESSAGE_HANDLER = ( +export const RMQ_NO_MESSAGE_HANDLER = ( text: TemplateStringsArray, pattern: string, ) => diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index c6e84661c..0572ca049 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -7,15 +7,15 @@ import { RMQ_SEPARATOR, RMQ_WILDCARD_ALL, RMQ_WILDCARD_SINGLE, - RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, - RQM_DEFAULT_NOACK, - RQM_DEFAULT_NO_ASSERT, - RQM_DEFAULT_PREFETCH_COUNT, - RQM_DEFAULT_QUEUE, - RQM_DEFAULT_QUEUE_OPTIONS, - RQM_DEFAULT_URL, - RQM_NO_EVENT_HANDLER, - RQM_NO_MESSAGE_HANDLER, + RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, + RMQ_DEFAULT_NOACK, + RMQ_DEFAULT_NO_ASSERT, + RMQ_DEFAULT_PREFETCH_COUNT, + RMQ_DEFAULT_QUEUE, + RMQ_DEFAULT_QUEUE_OPTIONS, + RMQ_DEFAULT_URL, + RMQ_NO_EVENT_HANDLER, + RMQ_NO_MESSAGE_HANDLER, } from '../constants.js'; import { RmqContext } from '../ctx-host/index.js'; import { Transport } from '../enums/index.js'; @@ -73,13 +73,13 @@ export class ServerRMQ extends Server { constructor(protected readonly options: Required['options']) { super(); - this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL]; + this.urls = this.getOptionsProp(this.options, 'urls') || [RMQ_DEFAULT_URL]; this.queue = - this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE; - this.noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK); + this.getOptionsProp(this.options, 'queue') || RMQ_DEFAULT_QUEUE; + this.noAck = this.getOptionsProp(this.options, 'noAck', RMQ_DEFAULT_NOACK); this.queueOptions = this.getOptionsProp(this.options, 'queueOptions') || - RQM_DEFAULT_QUEUE_OPTIONS; + RMQ_DEFAULT_QUEUE_OPTIONS; this.loadPackageSynchronously('amqplib', ServerRMQ.name, () => createRequire(import.meta.url)('amqplib'), @@ -190,11 +190,11 @@ export class ServerRMQ extends Server { const noAssert = this.getOptionsProp(this.options, 'noAssert') ?? this.queueOptions.noAssert ?? - RQM_DEFAULT_NO_ASSERT; + RMQ_DEFAULT_NO_ASSERT; let createdQueue: string; - if (this.queue === RQM_DEFAULT_QUEUE || !noAssert) { + if (this.queue === RMQ_DEFAULT_QUEUE || !noAssert) { const { queue } = await channel.assertQueue( this.queue, this.queueOptions, @@ -207,12 +207,12 @@ export class ServerRMQ extends Server { const isGlobalPrefetchCount = this.getOptionsProp( this.options, 'isGlobalPrefetchCount', - RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, + RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, ); const prefetchCount = this.getOptionsProp( this.options, 'prefetchCount', - RQM_DEFAULT_PREFETCH_COUNT, + RMQ_DEFAULT_PREFETCH_COUNT, ); if (this.options.exchange || this.options.wildcards) { @@ -292,7 +292,7 @@ export class ServerRMQ extends Server { if (!handler) { if (!this.noAck) { - this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`); + this.logger.warn(RMQ_NO_MESSAGE_HANDLER`${pattern}`); this.channel!.nack(rmqContext.getMessage() as Message, false, false); } const status = 'error'; @@ -337,7 +337,7 @@ export class ServerRMQ extends Server { const handler = this.getHandlerByPattern(pattern); if (!handler && !this.noAck) { this.channel!.nack(context.getMessage() as Message, false, false); - return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`); + return this.logger.warn(RMQ_NO_EVENT_HANDLER`${pattern}`); } return super.handleEvent(pattern, packet, context); } diff --git a/packages/microservices/test/server/server-rmq.spec.ts b/packages/microservices/test/server/server-rmq.spec.ts index a6ba38a60..ef417b14d 100644 --- a/packages/microservices/test/server/server-rmq.spec.ts +++ b/packages/microservices/test/server/server-rmq.spec.ts @@ -1,4 +1,4 @@ -import { NO_MESSAGE_HANDLER, RQM_DEFAULT_QUEUE } from '../../constants.js'; +import { NO_MESSAGE_HANDLER, RMQ_DEFAULT_QUEUE } from '../../constants.js'; import { RmqContext } from '../../ctx-host/index.js'; import { ServerRMQ } from '../../server/server-rmq.js'; import { objectToMap } from './utils/object-to-map.js'; @@ -222,11 +222,11 @@ describe('ServerRMQ', () => { expect(channel.assertQueue).toHaveBeenCalledWith(queue, queueOptions); }); it('should call "assertQueue" with queue and queue options when queue is default queue', async () => { - server['queue' as any] = RQM_DEFAULT_QUEUE; + server['queue' as any] = RMQ_DEFAULT_QUEUE; await server.setupChannel(channel, () => null); expect(channel.assertQueue).toHaveBeenCalledWith( - RQM_DEFAULT_QUEUE, + RMQ_DEFAULT_QUEUE, queueOptions, ); }); @@ -242,7 +242,7 @@ describe('ServerRMQ', () => { it('should call "bindQueue" with exchangeType is fanout', async () => { const namedQueue = 'exclusive-queue-name'; channel.assertQueue = vi.fn(() => ({ queue: namedQueue })); - server['queue' as any] = RQM_DEFAULT_QUEUE; + server['queue' as any] = RMQ_DEFAULT_QUEUE; server['options' as any] = { ...(server as any)['options'], exchangeType: 'fanout',