fix(microservices): to nack when there is no matching handler

This commit is contained in:
ryoctrl
2023-05-19 21:54:22 +09:00
parent d352e6f138
commit 674bfc5e83
3 changed files with 54 additions and 4 deletions

View File

@@ -17,6 +17,7 @@ import {
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_NO_EVENT_HANDLER,
} from '../constants';
import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
@@ -25,6 +26,7 @@ import { CustomTransportStrategy, RmqOptions } from '../interfaces';
import {
IncomingRequest,
OutgoingResponse,
ReadPacket,
} from '../interfaces/packet.interface';
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
import { Server } from './server';
@@ -42,6 +44,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
protected readonly urls: string[] | RmqUrl[];
protected readonly queue: string;
protected readonly prefetchCount: number;
protected readonly noAck: boolean;
protected readonly queueOptions: any;
protected readonly isGlobalPrefetchCount: boolean;
protected readonly noAssert: boolean;
@@ -54,6 +57,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
this.prefetchCount =
this.getOptionsProp(this.options, 'prefetchCount') ||
RQM_DEFAULT_PREFETCH_COUNT;
this.noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
this.isGlobalPrefetchCount =
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
@@ -141,8 +145,6 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
}
public async setupChannel(channel: any, callback: Function) {
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
if (!this.queueOptions.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
@@ -151,7 +153,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
this.queue,
(msg: Record<string, any>) => this.handleMessage(msg, channel),
{
noAck,
noAck: this.noAck,
},
);
callback();
@@ -200,6 +202,19 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
response$ && this.send(response$, publish);
}
public async handleEvent(
pattern: string,
packet: ReadPacket,
context: RmqContext,
): Promise<any> {
const handler = this.getHandlerByPattern(pattern);
if (!handler && !this.noAck) {
this.channel.nack(context.getMessage(), false, false);
return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`);
}
return super.handleEvent(pattern, packet, context);
}
public sendMessage<T = any>(
message: T,
replyTo: any,