feat(microservices): add gracefull shutdown option for nats server

This commit is contained in:
Ali Nowrouzi
2024-08-28 22:30:10 +03:30
parent 731e8069a4
commit d20a1e580f
5 changed files with 105 additions and 13 deletions

View File

@@ -71,3 +71,5 @@ export const EADDRINUSE = 'EADDRINUSE';
export const CONNECTION_FAILED_MESSAGE =
'Connection to transport failed. Trying to reconnect...';
export const NATS_DEFAULT_GRACE_PERIOD = 10000;

View File

@@ -50,7 +50,7 @@ interface Sub<T> extends AsyncIterable<T> {
getMax(): number | undefined;
}
declare type Subscription = Sub<NatsMsg>;
export declare type Subscription = Sub<NatsMsg>;
declare enum Events {
Disconnect = 'disconnect',

View File

@@ -193,6 +193,8 @@ export interface NatsOptions {
token?: string;
yieldTime?: number;
tokenHandler?: any;
gracefulShutdown?: boolean;
gracePeriod?: number;
[key: string]: any;
};
}

View File

@@ -1,9 +1,17 @@
import { isUndefined, isObject } from '@nestjs/common/utils/shared.utils';
import { NATS_DEFAULT_URL, NO_MESSAGE_HANDLER } from '../constants';
import {
NATS_DEFAULT_GRACE_PERIOD,
NATS_DEFAULT_URL,
NO_MESSAGE_HANDLER,
} from '../constants';
import { NatsContext } from '../ctx-host/nats.context';
import { NatsRequestJSONDeserializer } from '../deserializers/nats-request-json.deserializer';
import { Transport } from '../enums';
import { Client, NatsMsg } from '../external/nats-client.interface';
import {
Client,
NatsMsg,
Subscription,
} from '../external/nats-client.interface';
import { CustomTransportStrategy } from '../interfaces';
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
import { IncomingRequest } from '../interfaces/packet.interface';
@@ -21,6 +29,10 @@ export class ServerNats extends Server implements CustomTransportStrategy {
private natsClient: Client;
private readonly subscriptions: Subscription[] = [];
private readonly gracePeriod: number;
constructor(private readonly options: NatsOptions['options']) {
super();
@@ -28,6 +40,10 @@ export class ServerNats extends Server implements CustomTransportStrategy {
require('nats'),
);
this.gracePeriod =
this.getOptionsProp(this.options, 'gracePeriod') ||
NATS_DEFAULT_GRACE_PERIOD;
this.initializeSerializer(options);
this.initializeDeserializer(options);
}
@@ -60,12 +76,30 @@ export class ServerNats extends Server implements CustomTransportStrategy {
});
const registeredPatterns = [...this.messageHandlers.keys()];
registeredPatterns.forEach(channel => subscribe(channel));
for (const channel of registeredPatterns) {
const sub = subscribe(channel);
this.subscriptions.push(sub);
}
}
private async waitForGracePeriod() {
await new Promise<void>(res => {
setTimeout(() => {
res();
}, this.gracePeriod);
});
}
public async close() {
await this.natsClient?.close();
this.natsClient = null;
if (this.natsClient) {
const graceful = this.getOptionsProp(this.options, 'gracefulShutdown');
if (graceful) {
this.subscriptions.forEach(sub => sub.unsubscribe());
await this.waitForGracePeriod();
}
await this.natsClient?.close();
this.natsClient = null;
}
}
public createNatsClient(): Promise<Client> {

View File

@@ -41,13 +41,64 @@ describe('ServerNats', () => {
beforeEach(() => {
(server as any).natsClient = natsClient;
});
it('should close natsClient', () => {
server.close();
it('should close natsClient', async () => {
await server.close();
expect(natsClient.close.called).to.be.true;
});
describe('when "gracefulShutdown" is true', () => {
const waitForGracePeriod = sinon.spy();
const subscriptions = [
{ unsubscribe: sinon.spy() },
{ unsubscribe: sinon.spy() },
];
beforeEach(() => {
(server as any).subscriptions = subscriptions;
(server as any).waitForGracePeriod = waitForGracePeriod;
(server as any).options.gracefulShutdown = true;
});
it('should unsubscribe all subscriptions', async () => {
await server.close();
for (const subscription of subscriptions) {
expect(subscription.unsubscribe.calledOnce).to.be.true;
}
});
it('should call "waitForGracePeriod"', async () => {
await server.close();
expect(waitForGracePeriod.called).to.be.true;
});
});
describe('when "gracefulShutdown" is false', () => {
const waitForGracePeriod = sinon.spy();
const subscriptions = [
{ unsubscribe: sinon.spy() },
{ unsubscribe: sinon.spy() },
];
beforeEach(() => {
(server as any).subscriptions = subscriptions;
(server as any).waitForGracePeriod = waitForGracePeriod;
(server as any).options.gracefulShutdown = false;
});
it('should not unsubscribe all subscriptions', async () => {
await server.close();
for (const subscription of subscriptions) {
expect(subscription.unsubscribe.called).to.be.false;
}
});
it('should not call "waitForGracePeriod"', async () => {
await server.close();
expect(waitForGracePeriod.called).to.be.false;
});
});
});
describe('bindEvents', () => {
let onSpy: sinon.SinonSpy, subscribeSpy: sinon.SinonSpy, natsClient;
const pattern = 'test';
const messageHandler = sinon.spy();
beforeEach(() => {
onSpy = sinon.spy();
@@ -56,16 +107,19 @@ describe('ServerNats', () => {
on: onSpy,
subscribe: subscribeSpy,
};
(server as any).messageHandlers = objectToMap({
[pattern]: messageHandler,
});
});
it('should subscribe to each acknowledge patterns', () => {
const pattern = 'test';
const handler = sinon.spy();
(server as any).messageHandlers = objectToMap({
[pattern]: handler,
});
server.bindEvents(natsClient);
expect(subscribeSpy.calledWith(pattern)).to.be.true;
});
it('should fill the subscriptions array properly', () => {
server.bindEvents(natsClient);
expect(server['subscriptions'].length).to.be.equals(1);
});
});
describe('getMessageHandler', () => {
it(`should return function`, () => {