Merge pull request #15190 from nestjs/feat/nats-v3-migration

feat(microservices): nats v3 upgrade
This commit is contained in:
Kamil Mysliwiec
2026-02-16 13:27:55 +01:00
committed by GitHub
14 changed files with 7715 additions and 3318 deletions

View File

@@ -26,7 +26,8 @@ export class DisconnectedClientController {
code === 'CONN_ERR' ||
code === 'ENOTFOUND' ||
code === 'CONNECTION_REFUSED' ||
error.message.includes('Connection is closed.')
error.message.includes('Connection is closed.') ||
error.message.includes('connection refused')
? new RequestTimeoutException('ECONNREFUSED')
: new InternalServerErrorException(),
);

View File

@@ -1,3 +1,4 @@
import * as nats from '@nats-io/nats-core';
import { Body, Controller, Get, HttpCode, Post, Query } from '@nestjs/common';
import {
ClientProxy,
@@ -11,7 +12,6 @@ import {
RpcException,
Transport,
} from '@nestjs/microservices';
import * as nats from 'nats';
import { from, lastValueFrom, Observable, of, throwError } from 'rxjs';
import { catchError, scan } from 'rxjs/operators';
import { NatsService } from './nats.service.js';

10697
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -97,6 +97,7 @@
"@fastify/view": "11.1.1",
"@grpc/grpc-js": "1.14.3",
"@grpc/proto-loader": "0.8.0",
"@nats-io/transport-node": "^3.0.2",
"@nestjs/apollo": "13.2.4",
"@nestjs/graphql": "13.2.4",
"@nestjs/mongoose": "11.0.4",
@@ -153,8 +154,7 @@
"mqtt": "5.15.0",
"multer": "2.0.2",
"mysql2": "3.17.1",
"nats": "2.29.3",
"prettier": "^3.7.4",
"prettier": "3.7.4",
"redis": "5.10.0",
"reusify": "1.1.0",
"rxjs-compat": "6.6.7",
@@ -165,9 +165,9 @@
"tsx": "4.19.2",
"typeorm": "0.3.28",
"typescript": "5.9.3",
"typescript-eslint": "^8.55.1-alpha.4",
"unplugin-swc": "^1.5.9",
"vitest": "^4.0.18",
"typescript-eslint": "8.55.1-alpha.4",
"unplugin-swc": "1.5.9",
"vitest": "4.0.18",
"ws": "8.19.0"
},
"engines": {

View File

@@ -1,5 +1,6 @@
import { RequestMethod } from '@nestjs/common';
import { loadPackage } from '@nestjs/common/utils/load-package.util.js';
import * as microservicesPackage from '@nestjs/microservices';
import { MicroserviceOptions } from '@nestjs/microservices';
import { ApplicationConfig } from '../application-config.js';
import { NestContainer } from '../injector/container.js';
@@ -12,12 +13,13 @@ describe('NestApplication', () => {
beforeAll(async () => {
// Pre-populate the package cache so that connectMicroservice()
// can synchronously retrieve @nestjs/microservices via loadPackageCached.
// Use the already-imported module to avoid a slow dynamic import() on CI.
await loadPackage(
'@nestjs/microservices',
'NestApplication tests',
() => import('@nestjs/microservices'),
() => microservicesPackage,
);
}, 30_000);
});
describe('Hybrid Application', () => {
class Interceptor {

View File

@@ -1,5 +1,7 @@
import { createRequire } from 'module';
import { Logger } from '@nestjs/common';
import { isObject, loadPackageSync } from '@nestjs/common/internal';
import { EventEmitter } from 'events';
import { createRequire } from 'module';
import { NATS_DEFAULT_URL } from '../constants.js';
import { NatsResponseJSONDeserializer } from '../deserializers/nats-response-json.deserializer.js';
import { EmptyResponseException } from '../errors/empty-response.exception.js';
@@ -17,8 +19,6 @@ import {
import { NatsRecord } from '../record-builders/index.js';
import { NatsRecordSerializer } from '../serializers/nats-record.serializer.js';
import { ClientProxy } from './client-proxy.js';
import { Logger } from '@nestjs/common';
import { loadPackageSync, isObject } from '@nestjs/common/internal';
let natsPackage = {} as any;
@@ -26,8 +26,8 @@ let natsPackage = {} as any;
// because it would require the user to install the nats package even if they dont use Nats
// Otherwise, TypeScript would fail to compile the code.
//
// type Client = import('nats').NatsConnection;
// type NatsMsg = import('nats').Msg;
// type Client = import('@nats-io/transport-node').NatsConnection;
// type NatsMsg = import('@nats-io/transport-node').Msg;
type Client = Record<string, any>;
type NatsMsg = Record<string, any>;
@@ -46,8 +46,10 @@ export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
constructor(protected readonly options: Required<NatsOptions>['options']) {
super();
natsPackage = loadPackageSync('nats', ClientNats.name, () =>
createRequire(import.meta.url)('nats'),
natsPackage = loadPackageSync(
'@nats-io/transport-node',
ClientNats.name,
() => createRequire(import.meta.url)('@nats-io/transport-node'),
);
this.initializeSerializer(options);
@@ -101,15 +103,10 @@ export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
public async handleStatusUpdates(client: Client) {
for await (const status of client.status()) {
const data =
status.data && isObject(status.data)
? JSON.stringify(status.data)
: status.data;
switch (status.type) {
case 'error':
this.logger.error(
`NatsError: type: "${status.type}", data: "${data}".`,
`NatsError: type: "${status.type}", error: "${status.error}".`,
);
break;
@@ -120,14 +117,12 @@ export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
// Prevent unhandled promise rejection
this.connectionPromise.catch(() => {});
this.logger.error(
`NatsError: type: "${status.type}", data: "${data}".`,
);
this.logger.error(`NatsError: type: "${status.type}".`);
this._status$.next(NatsStatus.DISCONNECTED);
this.statusEventEmitter.emit(
NatsEventsMap.DISCONNECT,
status.data as string,
status.server as string,
);
break;
@@ -137,37 +132,42 @@ export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
case 'reconnect':
this.connectionPromise = Promise.resolve(client);
this.logger.log(
`NatsStatus: type: "${status.type}", data: "${data}".`,
);
this.logger.log(`NatsStatus: type: "${status.type}".`);
this._status$.next(NatsStatus.CONNECTED);
this.statusEventEmitter.emit(
NatsEventsMap.RECONNECT,
status.data as string,
status.server as string,
);
break;
case 'pingTimer':
case 'ping':
if (this.options.debug) {
this.logger.debug(
`NatsStatus: type: "${status.type}", data: "${data}".`,
`NatsStatus: type: "${status.type}", pending pings: "${status.pendingPings}".`,
);
}
break;
case 'update':
this.logger.log(
`NatsStatus: type: "${status.type}", data: "${data}".`,
`NatsStatus: type: "${status.type}", added: "${status.added}", deleted: "${status.deleted}".`,
);
this.statusEventEmitter.emit(NatsEventsMap.UPDATE, status.data);
this.statusEventEmitter.emit(NatsEventsMap.UPDATE, undefined);
break;
default:
default: {
const data =
'data' in status && isObject(status.data)
? JSON.stringify(status.data)
: 'data' in status
? status.data
: '';
this.logger.log(
`NatsStatus: type: "${status.type}", data: "${data}".`,
);
break;
}
}
}
}
@@ -192,7 +192,7 @@ export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
packet: ReadPacket & PacketId,
callback: (packet: WritePacket) => any,
) {
return async (error: string | Error | undefined, natsMsg: NatsMsg) => {
return async (error: Error | null, natsMsg: NatsMsg) => {
if (error) {
return callback({
err: error,
@@ -207,7 +207,7 @@ export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
isDisposed: true,
});
}
const message = await this.deserializer.deserialize(rawPacket);
const message = await this.deserializer.deserialize(natsMsg);
if (message.id && message.id !== packet.id) {
return undefined;
}
@@ -244,7 +244,10 @@ export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
);
const subscription = this.natsClient!.subscribe(inbox, {
callback: subscriptionHandler,
callback: subscriptionHandler as (
err: Error | null,
msg: NatsMsg,
) => Promise<never>,
});
const headers = this.mergeHeaders(serializedPacket.headers);

View File

@@ -1,36 +1,27 @@
import { createRequire } from 'module';
import { NatsCodec } from '../external/nats-codec.interface.js';
import { IncomingEvent, IncomingRequest } from '../interfaces/index.js';
import { IncomingRequestDeserializer } from './incoming-request.deserializer.js';
import { loadPackageSync } from '@nestjs/common/internal';
let natsPackage = {} as any;
// To enable type safety for Nats. This cant be uncommented by default
// because it would require the user to install the nats package even if they dont use Nats
// Otherwise, TypeScript would fail to compile the code.
//
// type NatsMsg = import('@nats-io/transport-node').Msg;
type NatsMsg = any;
/**
* @publicApi
*/
export class NatsRequestJSONDeserializer extends IncomingRequestDeserializer {
private readonly jsonCodec: NatsCodec<unknown>;
constructor() {
super();
natsPackage = loadPackageSync(
'nats',
NatsRequestJSONDeserializer.name,
() => createRequire(import.meta.url)('nats'),
);
this.jsonCodec = natsPackage.JSONCodec();
}
deserialize(
value: Uint8Array,
value: NatsMsg,
options?: Record<string, any>,
):
| IncomingRequest
| IncomingEvent
| Promise<IncomingRequest | IncomingEvent> {
const decodedRequest = this.jsonCodec.decode(value);
| Promise<IncomingRequest>
| Promise<IncomingEvent> {
const decodedRequest = value.json();
return super.deserialize(decodedRequest, options);
}
}

View File

@@ -1,33 +1,23 @@
import { createRequire } from 'module';
import { NatsCodec } from '../external/nats-codec.interface.js';
import { IncomingResponse } from '../interfaces/index.js';
import { IncomingResponseDeserializer } from './incoming-response.deserializer.js';
import { loadPackageSync } from '@nestjs/common/internal';
let natsPackage = {} as any;
// To enable type safety for Nats. This cant be uncommented by default
// because it would require the user to install the nats package even if they dont use Nats
// Otherwise, TypeScript would fail to compile the code.
//
// type NatsMsg = import('@nats-io/transport-node').Msg;
type NatsMsg = any;
/**
* @publicApi
*/
export class NatsResponseJSONDeserializer extends IncomingResponseDeserializer {
private readonly jsonCodec: NatsCodec<unknown>;
constructor() {
super();
natsPackage = loadPackageSync(
'nats',
NatsResponseJSONDeserializer.name,
() => createRequire(import.meta.url)('nats'),
);
this.jsonCodec = natsPackage.JSONCodec();
}
deserialize(
value: Uint8Array,
value: NatsMsg,
options?: Record<string, any>,
): IncomingResponse | Promise<IncomingResponse> {
const decodedRequest = this.jsonCodec.decode(value);
const decodedRequest = value.json();
return super.deserialize(decodedRequest, options);
}
}

View File

@@ -49,6 +49,7 @@
},
"peerDependencies": {
"@grpc/grpc-js": "*",
"@nats-io/transport-node": "*",
"@nestjs/common": "^11.0.0",
"@nestjs/core": "^11.0.0",
"@nestjs/websockets": "^11.0.0",
@@ -58,7 +59,6 @@
"ioredis": "*",
"kafkajs": "*",
"mqtt": "*",
"nats": "*",
"reflect-metadata": "^0.1.12 || ^0.2.0",
"rxjs": "^7.1.0"
},
@@ -66,6 +66,9 @@
"@grpc/grpc-js": {
"optional": true
},
"@nats-io/transport-node": {
"optional": true
},
"@nestjs/websockets": {
"optional": true
},
@@ -78,9 +81,6 @@
"mqtt": {
"optional": true
},
"nats": {
"optional": true
},
"ioredis": {
"optional": true
},

View File

@@ -1,25 +1,12 @@
import { createRequire } from 'module';
import { NatsCodec } from '../external/nats-codec.interface.js';
import { isObject } from '@nestjs/common/internal';
import { ReadPacket } from '../interfaces/index.js';
import { Serializer } from '../interfaces/serializer.interface.js';
import { NatsRecord, NatsRecordBuilder } from '../record-builders/index.js';
import { loadPackageSync, isObject } from '@nestjs/common/internal';
let natsPackage = {} as any;
export class NatsRecordSerializer implements Serializer<
ReadPacket,
NatsRecord
> {
private readonly jsonCodec: NatsCodec<unknown>;
constructor() {
natsPackage = loadPackageSync('nats', NatsRecordSerializer.name, () =>
createRequire(import.meta.url)('nats'),
);
this.jsonCodec = natsPackage.JSONCodec();
}
serialize(packet: any): NatsRecord {
const natsMessage =
packet?.data && isObject(packet.data) && packet.data instanceof NatsRecord
@@ -27,7 +14,7 @@ export class NatsRecordSerializer implements Serializer<
: new NatsRecordBuilder(packet?.data).build();
return {
data: this.jsonCodec.encode({ ...packet, data: natsMessage.data }),
data: JSON.stringify({ ...packet, data: natsMessage.data }),
headers: natsMessage.headers,
};
}

View File

@@ -1,3 +1,4 @@
import { isObject, isUndefined } from '@nestjs/common/internal';
import { EventEmitter } from 'events';
import {
NATS_DEFAULT_GRACE_PERIOD,
@@ -20,15 +21,14 @@ import { IncomingRequest } from '../interfaces/packet.interface.js';
import { NatsRecord } from '../record-builders/index.js';
import { NatsRecordSerializer } from '../serializers/nats-record.serializer.js';
import { Server } from './server.js';
import { isObject, isUndefined } from '@nestjs/common/internal';
// To enable type safety for Nats. This cant be uncommented by default
// because it would require the user to install the nats package even if they dont use Nats
// Otherwise, TypeScript would fail to compile the code.
//
// type Client = import('nats').NatsConnection;
// type NatsMsg = import('nats').Msg;
// type Subscription = import('nats').Subscription;
// type Client = import('@nats-io/transport-node').NatsConnection;
// type NatsMsg = import('@nats-io/transport-node').Msg;
// type Subscription = import('@nats-io/transport-node').Subscription;
type Client = any;
type NatsMsg = any;
@@ -123,25 +123,11 @@ export class ServerNats<
public async createNatsClient(): Promise<Client> {
const natsPackage = await this.loadPackage(
'nats',
'@nats-io/transport-node',
ServerNats.name,
() => import('nats'),
() => import('@nats-io/transport-node'),
);
// Eagerly initialize serializer/deserializer so they can be used synchronously
if (
this.serializer &&
typeof (this.serializer as any).init === 'function'
) {
await (this.serializer as any).init();
}
if (
this.deserializer &&
typeof (this.deserializer as any).init === 'function'
) {
await (this.deserializer as any).init();
}
const options = this.options || ({} as NatsOptions);
return natsPackage.connect({
servers: NATS_DEFAULT_URL,
@@ -164,7 +150,7 @@ export class ServerNats<
const replyTo = natsMsg.reply;
const natsCtx = new NatsContext([callerSubject, natsMsg.headers]);
const message = await this.deserializer.deserialize(rawMessage, {
const message = await this.deserializer.deserialize(natsMsg, {
channel,
replyTo,
});
@@ -216,34 +202,24 @@ export class ServerNats<
public async handleStatusUpdates(client: Client) {
for await (const status of client.status()) {
const data =
status.data && isObject(status.data)
? JSON.stringify(status.data)
: status.data;
switch (status.type) {
case 'error':
this.logger.error(
`NatsError: type: "${status.type}", data: "${data}".`,
`NatsError: type: "${status.type}", error: "${status.error}".`,
);
break;
case 'disconnect':
this.logger.error(
`NatsError: type: "${status.type}", data: "${data}".`,
);
this.logger.error(`NatsError: type: "${status.type}".`);
this._status$.next(NatsStatus.DISCONNECTED as S);
this.statusEventEmitter.emit(
NatsEventsMap.DISCONNECT,
status.data as string,
);
this.statusEventEmitter.emit(NatsEventsMap.DISCONNECT, status.server);
break;
case 'pingTimer':
case 'ping':
if (this.options.debug) {
this.logger.debug!(
`NatsStatus: type: "${status.type}", data: "${data}".`,
`NatsStatus: type: "${status.type}", pending pings: "${status.pendingPings}".`,
);
}
break;
@@ -253,29 +229,31 @@ export class ServerNats<
break;
case 'reconnect':
this.logger.log(
`NatsStatus: type: "${status.type}", data: "${data}".`,
);
this.logger.log(`NatsStatus: type: "${status.type}".`);
this._status$.next(NatsStatus.CONNECTED as S);
this.statusEventEmitter.emit(
NatsEventsMap.RECONNECT,
status.data as string,
);
this.statusEventEmitter.emit(NatsEventsMap.RECONNECT, status.server);
break;
case 'update':
this.logger.log(
`NatsStatus: type: "${status.type}", data: "${data}".`,
`NatsStatus: type: "${status.type}", added: "${status.added}", deleted: "${status.deleted}".`,
);
this.statusEventEmitter.emit(NatsEventsMap.UPDATE, status.data);
this.statusEventEmitter.emit(NatsEventsMap.UPDATE, undefined);
break;
default:
default: {
const data =
'data' in status && isObject(status.data)
? JSON.stringify(status.data)
: 'data' in status
? status.data
: '';
this.logger.log(
`NatsStatus: type: "${status.type}", data: "${data}".`,
);
break;
}
}
}
}

View File

@@ -1,6 +1,6 @@
import { headers as createHeaders, JSONCodec } from 'nats';
import { headers as createHeaders } from '@nats-io/transport-node';
import { ClientNats } from '../../client/client-nats.js';
import { ReadPacket } from '../../interfaces/index.js';
import { ReadPacket, WritePacket } from '../../interfaces/index.js';
import { NatsRecord } from '../../record-builders/index.js';
describe('ClientNats', () => {
@@ -93,7 +93,10 @@ describe('ClientNats', () => {
Object.assign(packet as object, { id }),
);
subscription = client['publish'](msg, callback);
subscription = client['publish'](
msg,
callback as (packet: WritePacket) => any,
);
subscription();
});
afterEach(() => {
@@ -160,7 +163,8 @@ describe('ClientNats', () => {
id: '1',
};
const natsMessage = {
data: JSONCodec().encode(responseMessage),
data: JSON.stringify(responseMessage),
json: () => responseMessage,
};
let callback: ReturnType<typeof vi.fn>, subscription;
@@ -169,7 +173,10 @@ describe('ClientNats', () => {
beforeEach(async () => {
callback = vi.fn();
subscription = client.createSubscriptionHandler(msg, callback);
subscription = client.createSubscriptionHandler(
msg,
callback as (packet: WritePacket) => any,
);
await subscription(undefined, natsMessage);
});
it('should call callback with expected arguments', () => {
@@ -182,12 +189,18 @@ describe('ClientNats', () => {
describe('disposed and "id" is correct', () => {
beforeEach(async () => {
callback = vi.fn();
subscription = client.createSubscriptionHandler(msg, callback);
await subscription(undefined, {
data: JSONCodec().encode({
subscription = client.createSubscriptionHandler(
msg,
callback as (packet: WritePacket) => any,
);
subscription(undefined, {
data: JSON.stringify({
...responseMessage,
isDisposed: true,
}),
json: function () {
return JSON.parse(this.data);
},
});
});
@@ -207,13 +220,16 @@ describe('ClientNats', () => {
...msg,
id: '2',
},
callback,
callback as (packet: WritePacket) => any,
);
subscription(undefined, {
data: JSONCodec().encode({
data: JSON.stringify({
...responseMessage,
isDisposed: true,
}),
json: function () {
return JSON.parse(this.data);
},
});
});
@@ -296,18 +312,20 @@ describe('ClientNats', () => {
const clientMock = {
status: vi.fn().mockReturnValue({
async *[Symbol.asyncIterator]() {
yield { type: 'disconnect', data: 'localhost' };
yield { type: 'error', data: {} };
yield { type: 'disconnect' };
yield { type: 'error', error: 'Test error' };
},
}),
};
await client.handleStatusUpdates(clientMock as any);
expect(logErrorSpy).toHaveBeenCalledTimes(2);
expect(logErrorSpy).toHaveBeenCalledWith(
'NatsError: type: "disconnect", data: "localhost".',
expect(logErrorSpy).toHaveBeenNthCalledWith(
1,
'NatsError: type: "disconnect".',
);
expect(logErrorSpy).toHaveBeenCalledWith(
'NatsError: type: "error", data: "{}".',
expect(logErrorSpy).toHaveBeenNthCalledWith(
2,
'NatsError: type: "error", error: "Test error".',
);
});
it('should log other statuses as "logs"', async () => {

View File

@@ -1,9 +1,7 @@
import * as nats from 'nats';
import * as nats from '@nats-io/nats-core';
import { NatsRecordBuilder } from '../../record-builders/index.js';
import { NatsRecordSerializer } from '../../serializers/nats-record.serializer.js';
const jsonCodec = nats.JSONCodec();
describe('NatsRecordSerializer', () => {
let instance: NatsRecordSerializer;
beforeEach(() => {
@@ -13,42 +11,42 @@ describe('NatsRecordSerializer', () => {
it('undefined', () => {
expect(instance.serialize({ data: undefined })).toEqual({
headers: undefined,
data: jsonCodec.encode({ data: undefined }),
data: JSON.stringify({ data: undefined }),
});
});
it('null', () => {
expect(instance.serialize({ data: null })).toEqual({
headers: undefined,
data: jsonCodec.encode({ data: null }),
data: JSON.stringify({ data: null }),
});
});
it('string', () => {
expect(instance.serialize({ data: 'string' })).toEqual({
headers: undefined,
data: jsonCodec.encode({ data: 'string' }),
data: JSON.stringify({ data: 'string' }),
});
});
it('number', () => {
expect(instance.serialize({ data: 12345 })).toEqual({
headers: undefined,
data: jsonCodec.encode({ data: 12345 }),
data: JSON.stringify({ data: 12345 }),
});
});
it('buffer', () => {
expect(instance.serialize({ data: Buffer.from('buffer') })).toEqual({
headers: undefined,
data: jsonCodec.encode({ data: Buffer.from('buffer') }),
data: JSON.stringify({ data: Buffer.from('buffer') }),
});
});
it('array', () => {
expect(instance.serialize({ data: [1, 2, 3, 4, 5] })).toEqual({
headers: undefined,
data: jsonCodec.encode({ data: [1, 2, 3, 4, 5] }),
data: JSON.stringify({ data: [1, 2, 3, 4, 5] }),
});
});
@@ -56,7 +54,7 @@ describe('NatsRecordSerializer', () => {
const serObject = { prop: 'value' };
expect(instance.serialize({ data: serObject })).toEqual({
headers: undefined,
data: jsonCodec.encode({ data: serObject }),
data: JSON.stringify({ data: serObject }),
});
});
@@ -73,7 +71,7 @@ describe('NatsRecordSerializer', () => {
}),
).toEqual({
headers: natsHeaders,
data: jsonCodec.encode({
data: JSON.stringify({
data: {
value: 'string',
},

View File

@@ -1,11 +1,10 @@
import { JSONCodec } from 'nats';
import { NO_MESSAGE_HANDLER } from '../../constants.js';
import { BaseRpcContext } from '../../ctx-host/base-rpc.context.js';
import { NatsContext } from '../../ctx-host/index.js';
import { ServerNats } from '../../server/server-nats.js';
import { objectToMap } from './utils/object-to-map.js';
// type NatsMsg = import('nats').Msg;
// type NatsMsg = import('@nats-io/nats-core').Msg;
type NatsMsg = any;
describe('ServerNats', () => {
@@ -44,7 +43,7 @@ describe('ServerNats', () => {
vi.spyOn(server, 'start').mockImplementation(() => {
throw error;
});
await server.listen(callbackSpy);
await server.listen(callbackSpy as any);
expect(callbackSpy).toHaveBeenCalledWith(error);
});
});
@@ -178,23 +177,24 @@ describe('ServerNats', () => {
beforeEach(() => {
getPublisherSpy = vi.fn();
vi.spyOn(server, 'getPublisher').mockImplementation(
() => getPublisherSpy,
() => getPublisherSpy as any,
);
});
it('should call "handleEvent" if identifier is not present', async () => {
const handleEventSpy = vi.spyOn(server, 'handleEvent');
const data = JSONCodec().encode({ id: 10 });
const data = JSON.stringify({ id: 10 });
const natsMsg: NatsMsg = {
data,
subject: channel,
sid: +id,
respond: vi.fn(),
json: () => JSON.parse(data),
};
await server.handleMessage(channel, natsMsg);
expect(handleEventSpy).toHaveBeenCalled();
});
it(`should publish NO_MESSAGE_HANDLER if pattern does not exist in messageHandlers object`, async () => {
const data = JSONCodec().encode({
const data = JSON.stringify({
id,
pattern: 'test',
data: 'test',
@@ -204,6 +204,7 @@ describe('ServerNats', () => {
subject: channel,
sid: +id,
respond: vi.fn(),
json: () => JSON.parse(data),
};
await server.handleMessage(channel, natsMsg);
@@ -222,7 +223,7 @@ describe('ServerNats', () => {
const headers = {};
const natsContext = new NatsContext([channel, headers]);
const data = JSONCodec().encode({
const data = JSON.stringify({
pattern: channel,
data: 'test',
id,
@@ -233,6 +234,7 @@ describe('ServerNats', () => {
sid: +id,
respond: vi.fn(),
headers,
json: () => JSON.parse(data),
};
await server.handleMessage(channel, natsMsg);
expect(handler).toHaveBeenCalledWith('test', natsContext);
@@ -261,13 +263,13 @@ describe('ServerNats', () => {
sid: +id,
respond: vi.fn(),
reply: replyTo,
};
} as NatsMsg;
const publisher = server.getPublisher(natsMsg, id, context);
const respond = 'test';
publisher({ respond, id });
expect(natsMsg.respond).toHaveBeenCalledWith(
JSONCodec().encode({ respond, id }),
JSON.stringify({ respond, id }),
expect.objectContaining({}),
);
});
@@ -279,7 +281,7 @@ describe('ServerNats', () => {
reply: replyTo,
sid: +id,
respond: vi.fn(),
};
} as NatsMsg;
const publisher = server.getPublisher(natsMsg, id, context);
const respond = 'test';
@@ -321,18 +323,18 @@ describe('ServerNats', () => {
const serverMock = {
status: vi.fn().mockReturnValue({
async *[Symbol.asyncIterator]() {
yield { type: 'disconnect', data: 'localhost' };
yield { type: 'error', data: {} };
yield { type: 'disconnect' };
yield { type: 'error', error: 'Test error' };
},
}),
};
await server.handleStatusUpdates(serverMock as any);
expect(logErrorSpy).toHaveBeenCalledTimes(2);
expect(logErrorSpy).toHaveBeenCalledWith(
`NatsError: type: "disconnect", data: "localhost".`,
`NatsError: type: "disconnect".`,
);
expect(logErrorSpy).toHaveBeenCalledWith(
`NatsError: type: "error", data: "{}".`,
`NatsError: type: "error", error: "Test error".`,
);
});
it('should log other statuses as "logs"', async () => {