feature(@nestjs/microservices) expose connect method, improvements & fixes

This commit is contained in:
Kamil Myśliwiec
2018-05-09 17:21:21 +02:00
parent 7b950ffa6e
commit 6016f8d175
31 changed files with 660 additions and 415 deletions

View File

@@ -0,0 +1,59 @@
import * as express from 'express';
import * as request from 'supertest';
import { Test } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import { join } from 'path';
import { DisconnectedClientController } from '../src/disconnected.controller';
import { Transport } from '@nestjs/microservices';
describe('Disconnected client', () => {
let server;
let app: INestApplication;
beforeEach(async () => {
const module = await Test.createTestingModule({
controllers: [DisconnectedClientController],
}).compile();
server = express();
app = module.createNestApplication(server);
await app.init();
});
it(`TCP`, () => {
return request(server)
.post('/')
.send({
transport: Transport.TCP,
})
.expect(408);
});
it(`REDIS`, () => {
return request(server)
.post('/')
.send({
transport: Transport.REDIS,
options: {
url: 'redis://localhost:3333',
},
})
.expect(408);
});
it(`NATS`, () => {
return request(server)
.post('/')
.send({
transport: Transport.NATS,
options: {
url: 'nats://localhost:4224',
},
})
.expect(408);
});
afterEach(async () => {
await app.close();
});
});

View File

@@ -0,0 +1,33 @@
import {
Controller,
Post,
Body,
RequestTimeoutException,
InternalServerErrorException,
} from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { ClientProxyFactory } from '@nestjs/microservices';
import { tap, catchError } from 'rxjs/operators';
@Controller()
export class DisconnectedClientController {
@Post()
call(@Body() options): Observable<number> {
const client = ClientProxyFactory.create(options);
return client
.send<number, number[]>({ cmd: 'none' }, [1, 2, 3])
.pipe(
tap(
console.log.bind(console, 'data'),
console.error.bind(console, 'error'),
),
catchError(({ code }) =>
throwError(
code === 'ECONNREFUSED' || code === 'CONN_ERR'
? new RequestTimeoutException('ECONNREFUSED')
: new InternalServerErrorException(),
),
),
);
}
}

View File

@@ -4,8 +4,8 @@ import {
MessagePattern,
ClientProxy,
Transport,
GrpcRoute,
ClientGrpc,
GrpcMethod,
} from '@nestjs/microservices';
import { Observable, of } from 'rxjs';
import { join } from 'path';
@@ -28,7 +28,7 @@ export class GrpcController {
return svc.sum(data);
}
@GrpcRoute('Math', 'Sum')
@GrpcMethod('Math')
async sum({ data }: { data: number[] }): Promise<any> {
return of({
result: data.reduce((a, b) => a + b),

View File

@@ -123,6 +123,12 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
this.grpcClient = null;
}
public async connect(): Promise<any> {
throw new Error(
'The "connect()" method is not supported in gRPC mode.',
);
}
protected async publish(partialPacket, callback: (packet) => any) {
throw new Error(
'Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).',

View File

@@ -12,6 +12,7 @@ import { WritePacket, MqttOptions } from './../interfaces';
import { ReadPacket, PacketId } from './../interfaces';
import { MqttClient } from '../external/mqtt-client.interface';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { ECONNREFUSED } from './constants';
let mqttPackage: any = {};
@@ -27,48 +28,6 @@ export class ClientMqtt extends ClientProxy {
mqttPackage = loadPackage('mqtt', ClientMqtt.name);
}
protected publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
if (!this.mqttClient) {
this.init(callback);
}
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern);
const responseCallback = (channel: string, buffer: Buffer) => {
const { err, response, isDisposed, id } = JSON.parse(
buffer.toString(),
) as WritePacket & PacketId;
if (id !== packet.id) {
return undefined;
}
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
this.mqttClient.unsubscribe(channel);
this.mqttClient.removeListener(MESSAGE_EVENT, responseCallback);
return;
}
callback({
err,
response,
});
};
this.mqttClient.on(MESSAGE_EVENT, responseCallback);
this.mqttClient.subscribe(responseChannel);
this.mqttClient.publish(
this.getAckPatternName(pattern),
JSON.stringify(packet),
);
return responseCallback;
}
public getAckPatternName(pattern: string): string {
return `${pattern}_ack`;
}
@@ -82,27 +41,69 @@ export class ClientMqtt extends ClientProxy {
this.mqttClient = null;
}
public init(callback: (...args) => any) {
public connect(): Promise<any> {
this.mqttClient = this.createClient();
this.handleError(this.mqttClient, callback);
return new Promise((resolve, reject) => {
this.handleError(this.mqttClient);
this.connect$(this.mqttClient)
.subscribe(resolve, reject);
});
}
public createClient(): MqttClient {
return mqttPackage.connect(this.url, this.options.options as MqttOptions);
}
public handleError(client: MqttClient, callback: (...args) => any) {
const errorCallback = err => {
if (err.code === 'ECONNREFUSED') {
callback(err, null);
this.mqttClient = null;
public handleError(client: MqttClient) {
client.addListener(
ERROR_EVENT,
err => err.code !== ECONNREFUSED && this.logger.error(err),
);
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
): Promise<any> {
try {
if (!this.mqttClient) {
await this.connect();
}
this.logger.error(err);
};
client.addListener(ERROR_EVENT, errorCallback);
client.on(CONNECT_EVENT, () => {
client.removeListener(ERROR_EVENT, errorCallback);
client.addListener(ERROR_EVENT, err => this.logger.error(err));
});
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern);
const responseCallback = (channel: string, buffer: Buffer) => {
const { err, response, isDisposed, id } = JSON.parse(
buffer.toString(),
) as WritePacket & PacketId;
if (id !== packet.id) {
return undefined;
}
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
this.mqttClient.unsubscribe(channel);
this.mqttClient.removeListener(MESSAGE_EVENT, responseCallback);
return;
}
callback({
err,
response,
});
};
this.mqttClient.on(MESSAGE_EVENT, responseCallback);
this.mqttClient.subscribe(responseChannel);
this.mqttClient.publish(
this.getAckPatternName(pattern),
JSON.stringify(packet),
);
return responseCallback;
}
catch (err) {
callback({ err });
}
}
}

View File

@@ -10,6 +10,7 @@ import {
} from './../interfaces';
import { Client } from '../external/nats-client.interface';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { CONN_ERR } from './constants';
let natsPackage: any = {};
@@ -25,43 +26,6 @@ export class ClientNats extends ClientProxy {
natsPackage = loadPackage('nats', ClientNats.name);
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
if (!this.natsClient) {
await this.init(callback);
}
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern);
const subscriptionHandler = (message: WritePacket & PacketId) => {
if (message.id !== packet.id) {
return undefined;
}
const { err, response, isDisposed } = message;
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
return this.natsClient.unsubscribe(subscriptionId);
}
callback({
err,
response,
});
};
const subscriptionId = this.natsClient.subscribe(
responseChannel,
subscriptionHandler,
);
this.natsClient.publish(this.getAckPatternName(pattern), packet as any);
return subscriptionHandler;
}
public getAckPatternName(pattern: string): string {
return `${pattern}_ack`;
}
@@ -75,33 +39,69 @@ export class ClientNats extends ClientProxy {
this.natsClient = null;
}
public async init(callback: (...args) => any) {
public async connect(): Promise<any> {
this.natsClient = await this.createClient();
this.handleError(this.natsClient, callback);
return new Promise((resolve, reject) => {
this.handleError(this.natsClient);
this.connect$(this.natsClient)
.subscribe(resolve, reject);
});
}
public createClient(): Promise<Client> {
const options = this.options.options || ({} as NatsOptions);
const client = natsPackage.connect({
...(options as any),
const options: any = this.options.options || ({} as NatsOptions);
return natsPackage.connect({
...options,
url: this.url,
json: true,
});
return new Promise(resolve => client.on(CONNECT_EVENT, resolve));
}
public handleError(client: Client, callback: (...args) => any) {
const errorCallback = err => {
if (err.code === 'ECONNREFUSED') {
callback(err, null);
this.natsClient = null;
public handleError(client: Client) {
client.addListener(
ERROR_EVENT,
err => err.code !== CONN_ERR && this.logger.error(err),
);
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
): Promise<any> {
try {
if (!this.natsClient) {
await this.connect();
}
this.logger.error(err);
};
client.addListener(ERROR_EVENT, errorCallback);
client.on(CONNECT_EVENT, () => {
client.removeListener(ERROR_EVENT, errorCallback);
client.addListener(ERROR_EVENT, err => this.logger.error(err));
});
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern);
const subscriptionHandler = (message: WritePacket & PacketId) => {
if (message.id !== packet.id) {
return undefined;
}
const { err, response, isDisposed } = message;
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
return this.natsClient.unsubscribe(subscriptionId);
}
callback({
err,
response,
});
};
const subscriptionId = this.natsClient.subscribe(
responseChannel,
subscriptionHandler,
);
this.natsClient.publish(this.getAckPatternName(pattern), packet as any);
return subscriptionHandler;
} catch (err) {
callback({ err });
}
}
}

View File

@@ -7,23 +7,31 @@ import {
WritePacket,
ClientOptions,
} from './../interfaces';
import { fromEvent, merge } from 'rxjs';
import { map, take, tap } from 'rxjs/operators';
import { ERROR_EVENT, CONNECT_EVENT } from '../constants';
export abstract class ClientProxy {
public abstract connect(): Promise<any>;
public abstract close(): any;
public send<TResult = any, TInput = any>(
pattern: any,
data: TInput,
): Observable<TResult> {
if (isNil(pattern) || isNil(data)) {
return _throw(new InvalidMessageException());
}
return new Observable((observer: Observer<TResult>) => {
this.publish({ pattern, data }, this.createObserver(observer));
});
}
protected abstract publish(
packet: ReadPacket,
callback: (packet: WritePacket) => void,
);
public send<T = any>(pattern: any, data: any): Observable<T> {
if (isNil(pattern) || isNil(data)) {
return _throw(new InvalidMessageException());
}
return new Observable((observer: Observer<T>) => {
this.publish({ pattern, data }, this.createObserver(observer));
});
}
protected createObserver<T>(
observer: Observer<T>,
): (packet: WritePacket) => void {
@@ -45,6 +53,20 @@ export abstract class ClientProxy {
return Object.assign(packet, { id });
}
protected connect$(
instance: any,
errorEvent = ERROR_EVENT,
connectEvent = CONNECT_EVENT,
): Observable<any> {
const error$ = fromEvent(instance, errorEvent).pipe(
map(err => {
throw err;
}),
);
const connect$ = fromEvent(instance, connectEvent);
return merge(error$, connect$).pipe(take(1));
}
protected getOptionsProp<T extends { options? }>(
obj: ClientOptions,
prop: keyof T['options'],
@@ -52,4 +74,4 @@ export abstract class ClientProxy {
) {
return obj && obj.options ? obj.options[prop as any] : defaultValue;
}
}
}

View File

@@ -20,6 +20,8 @@ import {
RetryStrategyOptions,
} from '../external/redis.interface';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { Subject, fromEvent, merge, zip } from 'rxjs';
import { take } from 'rxjs/operators';
let redisPackage: any = {};
@@ -38,58 +40,6 @@ export class ClientRedis extends ClientProxy {
redisPackage = loadPackage('redis', ClientRedis.name);
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
if (!this.pubClient || !this.subClient) {
this.init(callback);
}
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern);
const responseCallback = (channel: string, buffer: string) => {
const { err, response, isDisposed, id } = JSON.parse(
buffer,
) as WritePacket & PacketId;
if (id !== packet.id) {
return undefined;
}
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
this.subClient.unsubscribe(channel);
this.subClient.removeListener(MESSAGE_EVENT, responseCallback);
return;
}
callback({
err,
response,
});
};
this.subClient.on(MESSAGE_EVENT, responseCallback);
this.subClient.subscribe(responseChannel);
await new Promise(resolve => {
const handler = channel => {
if (channel && channel !== responseChannel) {
return undefined;
}
this.subClient.removeListener(SUBSCRIBE, handler);
resolve();
};
this.subClient.on(SUBSCRIBE, handler);
});
this.pubClient.publish(
this.getAckPatternName(pattern),
JSON.stringify(packet),
);
return responseCallback;
}
public getAckPatternName(pattern: string): string {
return `${pattern}_ack`;
}
@@ -104,38 +54,37 @@ export class ClientRedis extends ClientProxy {
this.pubClient = this.subClient = null;
}
public init(callback: (...args) => any) {
this.pubClient = this.createClient();
this.subClient = this.createClient();
public connect(): Promise<any> {
return new Promise((resolve, reject) => {
const error$ = new Subject<Error>();
this.handleError(this.pubClient, callback);
this.handleError(this.subClient, callback);
this.pubClient = this.createClient(error$);
this.subClient = this.createClient(error$);
this.handleError(this.pubClient);
this.handleError(this.subClient);
const pubConnect$ = fromEvent(this.pubClient, CONNECT_EVENT);
const subClient$ = fromEvent(this.subClient, CONNECT_EVENT);
merge(error$, zip(pubConnect$, subClient$))
.pipe(take(1))
.subscribe(resolve, reject);
});
}
public createClient(): RedisClient {
public createClient(error$: Subject<Error>): RedisClient {
return redisPackage.createClient({
...this.getClientOptions(),
...this.getClientOptions(error$),
url: this.url,
});
}
public handleError(client: RedisClient, callback: (...args) => any) {
const errorCallback = err => {
if (err.code === 'ECONNREFUSED') {
callback(err, null);
this.pubClient = this.subClient = null;
}
this.logger.error(err);
};
client.addListener(ERROR_EVENT, errorCallback);
client.on(CONNECT_EVENT, () => {
client.removeListener(ERROR_EVENT, errorCallback);
client.addListener(ERROR_EVENT, err => this.logger.error(err));
});
public handleError(client: RedisClient) {
client.addListener(ERROR_EVENT, err => this.logger.error(err));
}
public getClientOptions(): Partial<ClientOpts> {
const retry_strategy = options => this.createRetryStrategy(options);
public getClientOptions(error$: Subject<Error>): Partial<ClientOpts> {
const retry_strategy = options => this.createRetryStrategy(options, error$);
return {
retry_strategy,
};
@@ -143,7 +92,12 @@ export class ClientRedis extends ClientProxy {
public createRetryStrategy(
options: RetryStrategyOptions,
): undefined | number {
error$: Subject<Error>,
): undefined | number | Error {
if (options.error && (options.error as any).code === 'ECONNREFUSED') {
error$.error(options.error);
return options.error;
}
if (
this.isExplicitlyTerminated ||
!this.getOptionsProp<RedisOptions>(this.options, 'retryAttempts') ||
@@ -154,4 +108,61 @@ export class ClientRedis extends ClientProxy {
}
return this.getOptionsProp(this.options, 'retryDelay') || 0;
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
try {
if (!this.pubClient || !this.subClient) {
await this.connect();
}
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern);
const responseCallback = (channel: string, buffer: string) => {
const { err, response, isDisposed, id } = JSON.parse(
buffer,
) as WritePacket & PacketId;
if (id !== packet.id) {
return undefined;
}
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
this.subClient.unsubscribe(channel);
this.subClient.removeListener(MESSAGE_EVENT, responseCallback);
return;
}
callback({
err,
response,
});
};
this.subClient.on(MESSAGE_EVENT, responseCallback);
this.subClient.subscribe(responseChannel);
await new Promise(resolve => {
const handler = channel => {
if (channel && channel !== responseChannel) {
return undefined;
}
this.subClient.removeListener(SUBSCRIBE, handler);
resolve();
};
this.subClient.on(SUBSCRIBE, handler);
});
this.pubClient.publish(
this.getAckPatternName(pattern),
JSON.stringify(packet),
);
return responseCallback;
}
catch (err) {
callback({ err });
}
}
}

View File

@@ -15,6 +15,8 @@ import {
CLOSE_EVENT,
} from './../constants';
import { WritePacket, ReadPacket, PacketId } from './../interfaces';
import { tap } from 'rxjs/operators';
import { ECONNREFUSED } from './constants';
export class ClientTCP extends ClientProxy {
private readonly logger = new Logger(ClientTCP.name);
@@ -33,41 +35,18 @@ export class ClientTCP extends ClientProxy {
TCP_DEFAULT_HOST;
}
public init(callback: (...args) => any): Promise<JsonSocket> {
public connect(): Promise<any> {
this.socket = this.createSocket();
return new Promise(resolve => {
this.bindEvents(this.socket, callback);
this.socket._socket.once(CONNECT_EVENT, () => {
this.isConnected = true;
resolve(this.socket);
});
return new Promise((resolve, reject) => {
this.bindEvents(this.socket);
this.connect$(this.socket._socket)
.pipe(tap(() => (this.isConnected = true)))
.subscribe(resolve, reject);
this.socket.connect(this.port, this.host);
});
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
const handleRequestResponse = (jsonSocket: JsonSocket) => {
const packet = this.assignPacketId(partialPacket);
jsonSocket.sendMessage(packet);
const listener = (buffer: WritePacket & PacketId) => {
if (buffer.id !== packet.id) {
return undefined;
}
this.handleResponse(jsonSocket, callback, buffer, listener);
};
jsonSocket.on(MESSAGE_EVENT, listener);
};
if (this.isConnected) {
return handleRequestResponse(this.socket);
}
const socket = await this.init(callback);
handleRequestResponse(socket);
return;
}
public handleResponse(
socket: JsonSocket,
callback: (packet: WritePacket) => any,
@@ -98,15 +77,15 @@ export class ClientTCP extends ClientProxy {
this.handleClose();
}
public bindEvents(socket: JsonSocket, callback: (...args) => any) {
socket.on(ERROR_EVENT, err => this.handleError(err, callback));
public bindEvents(socket: JsonSocket) {
socket.on(
ERROR_EVENT,
err => err.code !== ECONNREFUSED && this.handleError(err),
);
socket.on(CLOSE_EVENT, () => this.handleClose());
}
public handleError(err: any, callback: (...args) => any) {
if (err.code === 'ECONNREFUSED') {
callback(err, null);
}
public handleError(err: any) {
this.logger.error(err);
}
@@ -114,4 +93,29 @@ export class ClientTCP extends ClientProxy {
this.isConnected = false;
this.socket = null;
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
try {
if (!this.isConnected) {
await this.connect();
}
const handleRequestResponse = (jsonSocket: JsonSocket) => {
const packet = this.assignPacketId(partialPacket);
jsonSocket.sendMessage(packet);
const listener = (buffer: WritePacket & PacketId) => {
if (buffer.id !== packet.id) {
return undefined;
}
this.handleResponse(jsonSocket, callback, buffer, listener);
};
jsonSocket.on(MESSAGE_EVENT, listener);
};
handleRequestResponse(this.socket);
} catch (err) {
callback({ err });
}
}
}

View File

@@ -0,0 +1,2 @@
export const ECONNREFUSED = 'ECONNREFUSED';
export const CONN_ERR = 'CONN_ERR';

View File

@@ -43,9 +43,17 @@ export class RpcContextCreator {
callback,
module,
);
const handler = (args: any[]) => async () => {
const [data, ...params] = args;
const result = await this.pipesConsumer.applyPipes(
data,
{ metatype },
pipes,
);
return callback.call(instance, result, ...params);
};
return this.rpcProxy.create(async (...args) => {
const [data, ...params] = args;
const canActivate = await this.guardsConsumer.tryActivate(
guards,
args,
@@ -55,21 +63,12 @@ export class RpcContextCreator {
if (!canActivate) {
throw new RpcException(FORBIDDEN_MESSAGE);
}
const handler = async () => {
const result = await this.pipesConsumer.applyPipes(
data,
{ metatype },
pipes,
);
return callback.call(instance, result, ...params);
};
return await this.interceptorsConsumer.intercept(
interceptors,
args,
instance,
callback,
handler,
handler(args),
);
}, exceptionHandler);
}

View File

@@ -30,7 +30,7 @@ export class ListenersController {
targetCallback,
module,
);
server.add(pattern, proxy);
server.addHandler(pattern, proxy);
});
}

View File

@@ -62,9 +62,9 @@ export class MicroservicesModule {
server: Server & CustomTransportStrategy,
module: string,
) {
controllers.forEach(({ instance }) => {
this.listenersController.bindPatternHandlers(instance, server, module);
});
controllers.forEach(({ instance }) =>
this.listenersController.bindPatternHandlers(instance, server, module),
);
}
public bindClients(controllers: Map<string, InstanceWrapper<Controller>>) {

View File

@@ -130,7 +130,10 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
public createRetryStrategy(
options: RetryStrategyOptions,
): undefined | number {
): undefined | number | void {
if (options.error && (options.error as any).code === 'ECONNREFUSED') {
return this.logger.error(`Error ECONNREFUSED: ${this.url}`);
}
if (
this.isExplicitlyTerminated ||
!this.getOptionsProp<RedisOptions>(this.options, 'retryAttempts') ||

View File

@@ -26,7 +26,10 @@ export abstract class Server {
return this.messageHandlers[pattern] ? this.messageHandlers[pattern] : null;
}
public add(pattern, callback: (data) => Promise<Observable<any>>) {
public addHandler(
pattern: any,
callback: (data) => Promise<Observable<any>>,
) {
this.messageHandlers[JSON.stringify(pattern)] = callback;
}

View File

@@ -133,4 +133,10 @@ describe('ClientGrpcProxy', () => {
expect(client['publish'](null, null)).to.eventually.throws(Error);
});
});
describe('connect', () => {
it('should throw exception', () => {
expect(client.connect()).to.eventually.throws(Error);
});
});
});

View File

@@ -27,7 +27,7 @@ describe('ClientMqtt', () => {
onSpy: sinon.SinonSpy,
removeListenerSpy: sinon.SinonSpy,
unsubscribeSpy: sinon.SinonSpy,
initSpy: sinon.SinonSpy,
connectSpy: sinon.SinonStub,
mqttClient;
beforeEach(() => {
@@ -46,22 +46,22 @@ describe('ClientMqtt', () => {
addListener: () => ({}),
};
(client as any).mqttClient = mqttClient;
initSpy = sinon.spy(client, 'init');
connectSpy = sinon.stub(client, 'connect');
});
afterEach(() => {
initSpy.restore();
connectSpy.restore();
});
it('should not call "init()" when mqtt client is not null', () => {
client['publish'](msg, () => {});
expect(initSpy.called).to.be.false;
it('should not call "connect()" when mqtt client is not null', async () => {
await client['publish'](msg, () => {});
expect(connectSpy.called).to.be.false;
});
it('should call "init()" when mqtt client is null', () => {
it('should call "connect()" when mqtt client is null', async () => {
(client as any).mqttClient = null;
client['publish'](msg, () => {});
expect(initSpy.called).to.be.true;
await client['publish'](msg, () => {});
expect(connectSpy.called).to.be.true;
});
it('should subscribe to response pattern name', () => {
client['publish'](msg, () => {});
it('should subscribe to response pattern name', async () => {
await client['publish'](msg, () => {});
expect(subscribeSpy.calledWith(`"${pattern}"_res`)).to.be.true;
});
it('should publish stringified message to acknowledge pattern name', async () => {
@@ -69,8 +69,8 @@ describe('ClientMqtt', () => {
expect(publishSpy.calledWith(`"${pattern}"_ack`, JSON.stringify(msg))).to
.be.true;
});
it('should listen on messages', () => {
client['publish'](msg, () => {});
it('should listen on messages', async () => {
await client['publish'](msg, () => {});
expect(onSpy.called).to.be.true;
});
describe('responseCallback', () => {
@@ -171,6 +171,17 @@ describe('ClientMqtt', () => {
});
});
});
describe('when connect throws', () => {
it('should call callback with error', async () => {
const err = new Error();
connectSpy.throws(err);
const callbackSpy = sinon.spy();
(client as any).mqttClient = null;
await client['publish'](msg, callbackSpy);
expect(callbackSpy.calledWith({ err })).to.be.true;
});
});
});
describe('close', () => {
let endSpy: sinon.SinonSpy;
@@ -188,18 +199,21 @@ describe('ClientMqtt', () => {
expect(endSpy.called).to.be.false;
});
});
describe('init', () => {
describe('connect', () => {
let createClientSpy: sinon.SinonSpy;
let handleErrorsSpy: sinon.SinonSpy;
let connect$Spy: sinon.SinonSpy;
beforeEach(() => {
beforeEach(async () => {
createClientSpy = sinon.spy(client, 'createClient');
handleErrorsSpy = sinon.spy(client, 'handleError');
client.init(sinon.spy());
connect$Spy = sinon.spy(client, 'connect$');
await client.connect();
});
afterEach(() => {
createClientSpy.restore();
handleErrorsSpy.restore();
connect$Spy.restore();
});
it('should call "createClient" once', () => {
expect(createClientSpy.called).to.be.true;
@@ -207,32 +221,18 @@ describe('ClientMqtt', () => {
it('should call "handleError" once', () => {
expect(handleErrorsSpy.called).to.be.true;
});
it('should call "connect$" once', () => {
expect(connect$Spy.called).to.be.true;
});
});
describe('handleError', () => {
it('should bind error event handler and call callback with error', () => {
const callback = sinon.spy();
const removeListenerSpy = sinon.spy();
const addListener = (name, fn) => {
const err = { code: 'ECONNREFUSED' };
fn(err);
expect(name).to.be.eql(ERROR_EVENT);
expect(callback.called).to.be.true;
expect(callback.calledWith(err, null)).to.be.true;
it('should bind error event handler', () => {
const callback = sinon.stub().callsFake((_, fn) => fn({ code: 'test' }));
const emitter = {
addListener: callback,
};
const onCallback = (name, fn) => {
fn();
expect(name).to.be.eql(CONNECT_EVENT);
expect(removeListenerSpy.called).to.be.true;
};
const stream = {
addListener,
on: onCallback,
removeListener: removeListenerSpy,
};
client.handleError(stream as any, callback);
client.handleError(emitter as any);
expect(callback.getCall(0).args[0]).to.be.eql(ERROR_EVENT);
});
});
});

View File

@@ -30,7 +30,7 @@ describe('ClientNats', () => {
onSpy: sinon.SinonSpy,
removeListenerSpy: sinon.SinonSpy,
unsubscribeSpy: sinon.SinonSpy,
initSpy: sinon.SinonSpy,
connectSpy: sinon.SinonStub,
natsClient,
createClient: sinon.SinonStub;
@@ -51,23 +51,23 @@ describe('ClientNats', () => {
};
(client as any).natsClient = natsClient;
initSpy = sinon.stub(client, 'init').callsFake(() => {
connectSpy = sinon.stub(client, 'connect').callsFake(() => {
(client as any).natsClient = natsClient;
});
createClient = sinon.stub(client, 'createClient').callsFake(() => client);
});
afterEach(() => {
initSpy.restore();
connectSpy.restore();
createClient.restore();
});
it('should not call "init()" when natsClient is not null', async () => {
it('should not call "connect()" when natsClient is not null', async () => {
await client['publish'](msg, () => {});
expect(initSpy.called).to.be.false;
expect(connectSpy.called).to.be.false;
});
it('should call "init()" when natsClient is null', async () => {
it('should call "connect()" when natsClient is null', async () => {
(client as any).natsClient = null;
await client['publish'](msg, () => {});
expect(initSpy.called).to.be.true;
expect(connectSpy.called).to.be.true;
});
it('should subscribe to response pattern name', async () => {
await client['publish'](msg, () => {});
@@ -168,6 +168,17 @@ describe('ClientNats', () => {
});
});
});
describe('when connect throws', () => {
it('should call callback with error', async () => {
const err = new Error();
connectSpy.throws(err);
const callbackSpy = sinon.spy();
(client as any).natsClient = null;
await client['publish'](msg, callbackSpy);
expect(callbackSpy.calledWith({ err })).to.be.true;
});
});
});
describe('close', () => {
let natsClose: sinon.SinonSpy;
@@ -182,13 +193,16 @@ describe('ClientNats', () => {
expect(natsClose.called).to.be.true;
});
});
describe('init', () => {
describe('connect', () => {
let createClientSpy: sinon.SinonSpy;
let handleErrorsSpy: sinon.SinonSpy;
let connect$Spy: sinon.SinonSpy;
const natsClient = {
addListener: sinon.spy(),
on: sinon.spy(),
on: (ev, fn) => ev === 'connect' ? fn() : null,
removeListener: sinon.spy(),
off: sinon.spy(),
};
beforeEach(async () => {
@@ -196,11 +210,14 @@ describe('ClientNats', () => {
.stub(client, 'createClient')
.callsFake(() => natsClient);
handleErrorsSpy = sinon.spy(client, 'handleError');
await client.init(sinon.spy());
connect$Spy = sinon.spy(client, 'connect$');
await client.connect();
});
afterEach(() => {
createClientSpy.restore();
handleErrorsSpy.restore();
connect$Spy.restore();
});
it('should call "createClient"', () => {
expect(createClientSpy.called).to.be.true;
@@ -208,32 +225,18 @@ describe('ClientNats', () => {
it('should call "handleError"', () => {
expect(handleErrorsSpy.called).to.be.true;
});
});
describe('handleError', () => {
it('should bind error event handler and call callback with error', () => {
const callback = sinon.spy();
const removeListenerSpy = sinon.spy();
const addListener = (name, fn) => {
const err = { code: 'ECONNREFUSED' };
fn(err);
expect(name).to.be.eql(ERROR_EVENT);
expect(callback.called).to.be.true;
expect(callback.calledWith(err, null)).to.be.true;
};
const onCallback = (name, fn) => {
fn();
expect(name).to.be.eql(CONNECT_EVENT);
expect(removeListenerSpy.called).to.be.true;
};
const stream = {
addListener,
on: onCallback,
removeListener: removeListenerSpy,
};
client.handleError(stream as any, callback);
it('should call "connect$" once', () => {
expect(connect$Spy.called).to.be.true;
});
});
});
describe('handleError', () => {
it('should bind error event handler', () => {
const callback = sinon.stub().callsFake((_, fn) => fn({ code: 'test' }));
const emitter = {
addListener: callback,
};
client.handleError(emitter as any);
expect(callback.getCall(0).args[0]).to.be.eql(ERROR_EVENT);
});
});
});

View File

@@ -4,6 +4,7 @@ import { ClientProxy } from '../../client/client-proxy';
import { Observable } from 'rxjs';
class TestClientProxy extends ClientProxy {
public async connect() {}
public publish(pattern, callback) {}
public close() {}
}

View File

@@ -2,6 +2,7 @@ import * as sinon from 'sinon';
import { expect } from 'chai';
import { ClientRedis } from '../../client/client-redis';
import { ERROR_EVENT, CONNECT_EVENT, MESSAGE_EVENT } from '../../constants';
import { Subject } from 'rxjs';
describe('ClientRedis', () => {
const test = 'test';
@@ -27,7 +28,7 @@ describe('ClientRedis', () => {
onSpy: sinon.SinonSpy,
removeListenerSpy: sinon.SinonSpy,
unsubscribeSpy: sinon.SinonSpy,
initSpy: sinon.SinonSpy,
connectSpy: sinon.SinonSpy,
sub,
pub;
@@ -48,20 +49,20 @@ describe('ClientRedis', () => {
pub = { publish: publishSpy };
(client as any).subClient = sub;
(client as any).pubClient = pub;
initSpy = sinon.spy(client, 'init');
connectSpy = sinon.spy(client, 'connect');
});
afterEach(() => {
initSpy.restore();
connectSpy.restore();
});
it('should not call "init()" when pub and sub are not null', () => {
it('should not call "connect()" when pub and sub are not null', () => {
client['publish'](msg, () => {});
expect(initSpy.called).to.be.false;
expect(connectSpy.called).to.be.false;
});
it('should call "init()" when pub and sub are null', () => {
it('should call "connect()" when pub and sub are null', () => {
(client as any).subClient = null;
(client as any).pubClient = null;
client['publish'](msg, () => {});
expect(initSpy.called).to.be.true;
expect(connectSpy.called).to.be.true;
});
it('should subscribe to response pattern name', () => {
client['publish'](msg, () => {});
@@ -110,7 +111,9 @@ describe('ClientRedis', () => {
expect(unsubscribeSpy.calledWith(`"${pattern}"_res`)).to.be.false;
});
it('should not remove listener', () => {
expect(removeListenerSpy.getCall(0).args[0]).to.not.be.eql(MESSAGE_EVENT);
expect(removeListenerSpy.getCall(0).args[0]).to.not.be.eql(
MESSAGE_EVENT,
);
});
});
describe('disposed and "id" is correct', () => {
@@ -123,9 +126,7 @@ describe('ClientRedis', () => {
callback = sinon.spy();
assignStub = sinon
.stub(client, 'assignPacketId')
.callsFake(packet =>
Object.assign(packet, { id }),
);
.callsFake(packet => Object.assign(packet, { id }));
subscription = await client['publish'](msg, callback);
subscription(channel, JSON.stringify({ isDisposed: true, id }));
});
@@ -134,11 +135,13 @@ describe('ClientRedis', () => {
it('should call callback with dispose param', () => {
expect(callback.called).to.be.true;
expect(callback.calledWith({
expect(
callback.calledWith({
isDisposed: true,
response: null,
err: undefined,
})).to.be.true;
}),
).to.be.true;
});
it('should unsubscribe to response pattern name', () => {
expect(unsubscribeSpy.calledWith(channel)).to.be.true;
@@ -157,9 +160,7 @@ describe('ClientRedis', () => {
callback = sinon.spy();
assignStub = sinon
.stub(client, 'assignPacketId')
.callsFake(packet =>
Object.assign(packet, { id }),
);
.callsFake(packet => Object.assign(packet, { id }));
subscription = await client['publish'](msg, callback);
subscription(channel, JSON.stringify({ isDisposed: true }));
});
@@ -206,14 +207,14 @@ describe('ClientRedis', () => {
expect(subClose.called).to.be.false;
});
});
describe('init', () => {
describe('connect', () => {
let createClientSpy: sinon.SinonSpy;
let handleErrorsSpy: sinon.SinonSpy;
beforeEach(() => {
beforeEach(async () => {
createClientSpy = sinon.spy(client, 'createClient');
handleErrorsSpy = sinon.spy(client, 'handleError');
client.init(sinon.spy());
await client.connect();
});
afterEach(() => {
createClientSpy.restore();
@@ -227,45 +228,29 @@ describe('ClientRedis', () => {
});
});
describe('handleError', () => {
it('should bind error event handler and call callback with error', () => {
const callback = sinon.spy();
const removeListenerSpy = sinon.spy();
const addListener = (name, fn) => {
const err = { code: 'ECONNREFUSED' };
fn(err);
expect(name).to.be.eql(ERROR_EVENT);
expect(callback.called).to.be.true;
expect(callback.calledWith(err, null)).to.be.true;
it('should bind error event handler', () => {
const callback = sinon.stub().callsFake((_, fn) => fn({ code: 'test' }));
const emitter = {
addListener: callback,
};
const onCallback = (name, fn) => {
fn();
expect(name).to.be.eql(CONNECT_EVENT);
expect(removeListenerSpy.called).to.be.true;
};
const stream = {
addListener,
on: onCallback,
removeListener: removeListenerSpy,
};
client.handleError(stream as any, callback);
client.handleError(emitter as any);
expect(callback.getCall(0).args[0]).to.be.eql(ERROR_EVENT);
});
});
describe('getClientOptions', () => {
it('should return options object with "retry_strategy" and call "createRetryStrategy"', () => {
const createSpy = sinon.spy(client, 'createRetryStrategy');
const { retry_strategy } = client.getClientOptions();
const { retry_strategy } = client.getClientOptions(new Subject());
retry_strategy({} as any);
expect(createSpy.called).to.be.true;
});
});
describe('createRetryStrategy', () => {
const subject = new Subject<Error>();
describe('when is terminated', () => {
it('should return undefined', () => {
(client as any).isExplicitlyTerminated = true;
const result = client.createRetryStrategy({} as any);
const result = client.createRetryStrategy({} as any, subject);
expect(result).to.be.undefined;
});
});
@@ -273,7 +258,7 @@ describe('ClientRedis', () => {
it('should return undefined', () => {
(client as any).options.options = {};
(client as any).options.options.retryAttempts = undefined;
const result = client.createRetryStrategy({} as any);
const result = client.createRetryStrategy({} as any, subject);
expect(result).to.be.undefined;
});
});
@@ -281,17 +266,30 @@ describe('ClientRedis', () => {
it('should return undefined', () => {
(client as any).options.options = {};
(client as any).options.options.retryAttempts = 3;
const result = client.createRetryStrategy({ attempt: 4 } as any);
const result = client.createRetryStrategy(
{ attempt: 4 } as any,
subject,
);
expect(result).to.be.undefined;
});
});
describe('when ECONNREFUSED', () => {
it('should return error', () => {
const error = { code: 'ECONNREFUSED' };
const result = client.createRetryStrategy({ error } as any, subject);
expect(result).to.be.eql(error);
});
});
describe('otherwise', () => {
it('should return delay (ms)', () => {
(client as any).options.options = {};
(client as any).isExplicitlyTerminated = false;
(client as any).options.options.retryAttempts = 3;
(client as any).options.options.retryDelay = 3;
const result = client.createRetryStrategy({ attempt: 2 } as any);
const result = client.createRetryStrategy(
{ attempt: 2 } as any,
subject,
);
expect(result).to.be.eql((client as any).options.options.retryDelay);
});
});

View File

@@ -1,14 +1,15 @@
import * as sinon from 'sinon';
import { expect } from 'chai';
import { ClientTCP } from '../../client/client-tcp';
import { MESSAGE_EVENT } from '../../constants';
import { MESSAGE_EVENT, ERROR_EVENT } from '../../constants';
describe('ClientTCP', () => {
const client = new ClientTCP({});
let socket: {
connect: sinon.SinonSpy;
connect: sinon.SinonStub;
publish: sinon.SinonSpy;
_socket: {
addListener: sinon.SinonStub,
removeListener: sinon.SinonSpy;
once: sinon.SinonStub;
};
@@ -23,10 +24,11 @@ describe('ClientTCP', () => {
event !== 'error' && event !== 'close' && callback({});
socket = {
connect: sinon.spy(),
connect: sinon.stub(),
publish: sinon.spy(),
on: sinon.stub().callsFake(onFakeCallback),
_socket: {
addListener: sinon.stub().callsFake(onFakeCallback),
removeListener: sinon.spy(),
once: sinon.stub().callsFake(onFakeCallback),
},
@@ -119,6 +121,17 @@ describe('ClientTCP', () => {
).to.be.true;
});
});
describe('when connect throws', () => {
it('should call callback with error', async () => {
const err = new Error();
const connectStub = sinon.stub(client, 'connect').throws(err);
const callbackSpy = sinon.spy();
(client as any).isConnected = false;
await client['publish']({} as any, callbackSpy);
expect(callbackSpy.calledWith({ err })).to.be.true;
});
});
});
describe('close', () => {
beforeEach(() => {
@@ -136,21 +149,14 @@ describe('ClientTCP', () => {
expect((client as any).socket).to.be.null;
});
});
describe('handleError', () => {
it('should call callback with error', () => {
const callback = sinon.spy();
const err = { code: 'ECONNREFUSED' };
client.handleError(err, callback);
expect(callback.called).to.be.true;
expect(callback.calledWith(err, null)).to.be.true;
});
it('should not call callback with error', () => {
const callback = sinon.spy();
const err = {};
client.handleError(err, callback);
expect(callback.called).to.be.false;
describe('bindEvents', () => {
it('should bind error event handler', () => {
const callback = sinon.stub().callsFake((_, fn) => fn({ code: 'test' }));
const emitter = {
on: callback,
};
client.bindEvents(emitter as any);
expect(callback.getCall(0).args[0]).to.be.eql(ERROR_EVENT);
});
});
});
});

View File

@@ -25,11 +25,11 @@ describe('ListenersController', () => {
(instance as any).metadataExplorer = metadataExplorer;
addSpy = sinon.spy();
server = {
add: addSpy,
addHandler: addSpy,
};
});
describe('bindPatternHandlers', () => {
it(`should call add method of server for each pattern handler`, () => {
it(`should call "addHandler" method of server for each pattern handler`, () => {
const handlers = [
{ pattern: 'test', targetCallback: 'tt' },
{ pattern: 'test2', targetCallback: '2' },

View File

@@ -195,6 +195,15 @@ describe('ServerRedis', () => {
expect(result).to.be.undefined;
});
});
describe('when ECONNREFUSED', () => {
it('should call logger', () => {
const loggerErrorSpy = sinon.spy((server as any).logger, 'error');
const result = server.createRetryStrategy(
{ error: { code: 'ECONNREFUSED' } } as any,
);
expect(loggerErrorSpy.called).to.be.true;
});
});
describe('otherwise', () => {
it('should return delay (ms)', () => {
(server as any).options.options = {};

View File

@@ -15,7 +15,7 @@ describe('Server', () => {
describe('add', () => {
it(`should add handler as a stringified pattern key`, () => {
server.add(pattern, callback as any);
server.addHandler(pattern, callback as any);
const handlers = server.getHandlers();
expect(handlers[JSON.stringify(pattern)]).to.equal(callback);

View File

@@ -1,6 +1,10 @@
import { expect } from 'chai';
import { PATTERN_METADATA } from '../../constants';
import { MessagePattern } from '../../utils/pattern.decorator';
import {
MessagePattern,
GrpcMethod,
createMethodMetadata,
} from '../../utils/pattern.decorator';
describe('@MessagePattern', () => {
const pattern = { role: 'test' };
@@ -13,3 +17,43 @@ describe('@MessagePattern', () => {
expect(metadata).to.be.eql(pattern);
});
});
describe('@GrpcMethod', () => {
class TestService {
@GrpcMethod()
public test() {}
@GrpcMethod('TestService2')
public test2() {}
@GrpcMethod('TestService2', 'Test2')
public test3() {}
}
it('should derive method and service name', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test);
expect(metadata).to.be.eql({
service: TestService.name,
rpc: 'Test',
});
});
it('should derive method', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test2);
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
});
});
it('should override both method and service', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test3);
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
});
});
});

View File

@@ -0,0 +1,8 @@
import 'reflect-metadata';
import { Controller } from '@nestjs/common';
/**
* Defines the GrpcService. The service can inject dependencies through constructor.
* Those dependencies have to belong to the same module.
*/
export const GrpcService = Controller;

View File

@@ -1,2 +1,3 @@
export * from './client.decorator';
export * from './pattern.decorator';
export * from './grpc-service.decorator';

View File

@@ -16,7 +16,32 @@ export const MessagePattern = <T = PatternMetadata | string>(
};
/**
* Registers gRPC route handler for specified service.
* Registers gRPC method handler for specified service.
*/
export const GrpcRoute = (service: string, rpc: string) =>
MessagePattern({ service, rpc });
export function GrpcMethod(service?: string);
export function GrpcMethod(service: string, method?: string);
export function GrpcMethod(service: string, method?: string) {
return (target, key, descriptor: PropertyDescriptor) => {
const metadata = createMethodMetadata(target, key, service, method);
return MessagePattern(metadata)(target, key, descriptor);
};
}
export function createMethodMetadata(
target: any,
key: string,
service: string | undefined,
method: string | undefined,
) {
const capitalizeFirstLetter = (str: string) =>
str.charAt(0).toUpperCase() + str.slice(1);
if (!service) {
const { name } = target.constructor;
return { service: name, rpc: capitalizeFirstLetter(key) };
}
if (service && !method) {
return { service, rpc: capitalizeFirstLetter(key) };
}
return { service, rpc: method };
}

View File

@@ -43,6 +43,15 @@ export class WsContextCreator {
callback,
module,
);
const handler = (args: any[]) => async () => {
const [client, data, ...params] = args;
const result = await this.pipesConsumer.applyPipes(
data,
{ metatype },
pipes,
);
return callback.call(instance, client, result, ...params);
};
return this.wsProxy.create(async (...args) => {
const canActivate = await this.guardsConsumer.tryActivate(
@@ -54,21 +63,12 @@ export class WsContextCreator {
if (!canActivate) {
throw new WsException(FORBIDDEN_MESSAGE);
}
const handler = async () => {
const [client, data, ...params] = args;
const result = await this.pipesConsumer.applyPipes(
data,
{ metatype },
pipes,
);
return callback.call(instance, client, result, ...params);
};
return await this.interceptorsConsumer.intercept(
interceptors,
args,
instance,
callback,
handler,
handler(args),
);
}, exceptionHandler);
}

View File

@@ -18,7 +18,7 @@
"class-validator": "^0.7.2",
"grpc": "^1.10.0",
"reflect-metadata": "^0.1.12",
"rxjs": "^6.0.0",
"rxjs": "^6.1.0",
"typescript": "^2.8.0"
},
"devDependencies": {

View File

@@ -1,10 +1,11 @@
import { Controller, Get, OnModuleInit } from '@nestjs/common';
import { Get, OnModuleInit } from '@nestjs/common';
import {
ClientProxy,
Client,
MessagePattern,
GrpcRoute,
GrpcMethod,
ClientGrpc,
GrpcService,
} from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { grpcClientOptions } from './../grpc-client.options';
@@ -15,7 +16,7 @@ interface HeroService {
findOne(data: { id: number }): Observable<any>;
}
@Controller()
@GrpcService()
export class HeroController implements OnModuleInit {
@Client(grpcClientOptions) private readonly client: ClientGrpc;
private heroService: HeroService;
@@ -29,7 +30,7 @@ export class HeroController implements OnModuleInit {
return this.heroService.findOne({ id: 1 });
}
@GrpcRoute('HeroService', 'FindOne')
@GrpcMethod('HeroService')
findOne(data: HeroById): Hero {
const items: Hero[] = [{ id: 1, name: 'John' }, { id: 2, name: 'Doe' }];
return items.find(({ id }) => id === data.id);