feat(@nestjs) reactive microservices, custom transport strategy, websockets adapter, exception filters breaking change, async pipes feature

This commit is contained in:
kamil.mysliwiec
2017-06-01 00:39:40 +02:00
parent 07d7768826
commit 9bf3661564
35 changed files with 527 additions and 125 deletions

View File

@@ -1 +0,0 @@
<!-- Love nest? Please consider supporting our collective: 👉 https://opencollective.com/nest/donate -->

View File

@@ -3,7 +3,8 @@ import { CustomException } from './exception.filter';
@Pipe()
export class ValidatorPipe implements PipeTransform {
public transform(value, metatype, token): any {
return value;
public async transform(value, metadata?) {
console.log(value, metadata);
return Promise.resolve(value);
}
}

View File

@@ -1,14 +0,0 @@
require('ts-node/register');
require('./example/math/microservice');

View File

@@ -7,8 +7,6 @@
"start": "concurrently \"npm run build:live\" \"npm run microservice:live\"",
"build:live": "nodemon -e ts --watch src index.js",
"build": "node index.js",
"microservice:live": "nodemon -e ts --watch src microservice.js",
"microservice": "node microservice.js",
"compile": "tsc -p tsconfig.prod.json",
"test": "nyc --require ts-node/register mocha src/**/*.spec.ts --reporter spec",
"coverage": "nyc report --reporter=text-lcov | coveralls",

View File

@@ -1,6 +1,11 @@
import { Paramtype } from './paramtype.interface';
export type Transform<T> = (value: T, metatype?, type?: Paramtype) => any;
export type Transform<T> = (value: T, metadata?: ArgumentMetadata) => any;
export interface ArgumentMetadata {
type: Paramtype;
metatype?: any;
}
export interface PipeTransform {
transform: Transform<any>;

View File

@@ -6,11 +6,21 @@ import { ExceptionFilters } from '../../utils/decorators/exception-filters.decor
describe('@ExceptionFilters', () => {
const filters = [ 'exception', 'exception2' ];
@ExceptionFilters(...filters) class Test {}
@ExceptionFilters(...filters as any) class Test {}
class TestWithMethod {
@ExceptionFilters(...filters as any)
public static test() {}
}
it('should enhance class with expected exception filters array', () => {
const metadata = Reflect.getMetadata(EXCEPTION_FILTERS_METADATA, Test);
expect(metadata).to.be.eql(filters);
});
it('should enhance method with expected exception filters array', () => {
const metadata = Reflect.getMetadata(EXCEPTION_FILTERS_METADATA, TestWithMethod.test);
expect(metadata).to.be.eql(filters);
});
});

View File

@@ -0,0 +1,26 @@
import 'reflect-metadata';
import { expect } from 'chai';
import { UsePipes } from '../../utils/decorators/use-pipes.decorator';
import { PIPES_METADATA } from './../../constants';
describe('@UsePipes', () => {
const pipes = [ 'pipe1', 'pipe2' ];
@UsePipes(...pipes as any) class Test {}
class TestWithMethod {
@UsePipes(...pipes as any)
public static test() {}
}
it('should enhance class with expected pipes array', () => {
const metadata = Reflect.getMetadata(PIPES_METADATA, Test);
expect(metadata).to.be.eql(pipes);
});
it('should enhance method with expected pipes array', () => {
const metadata = Reflect.getMetadata(PIPES_METADATA, TestWithMethod.test);
expect(metadata).to.be.eql(pipes);
});
});

View File

@@ -32,7 +32,6 @@ export class InstanceLoader {
modules.forEach((module) => {
this.createInstancesOfComponents(module);
this.createInstancesOfRoutes(module);
this.callModuleInitHook(module);
const { name } = module.metatype;
this.logger.log(ModuleInitMessage(name));
@@ -62,16 +61,4 @@ export class InstanceLoader {
this.injector.loadInstanceOfRoute(wrapper, module);
});
}
private callModuleInitHook(module: Module) {
const components = [...module.routes, ...module.components];
iterate(components).map(([key, {instance}]) => instance)
.filter((instance) => !isNil(instance))
.filter(this.hasOnModuleInitHook)
.forEach((instance) => (instance as OnModuleInit).onModuleInit());
}
private hasOnModuleInitHook(instance: Controller | Injectable): instance is OnModuleInit {
return !isUndefined((instance as OnModuleInit).onModuleInit);
}
}

View File

@@ -60,16 +60,20 @@ export class MiddlewaresModule {
public static setupMiddlewares(app: Application) {
const configs = this.container.getConfigs();
configs.forEach((moduleConfigs, module: string) => {
[ ...moduleConfigs ].forEach((config: MiddlewareConfiguration) => {
config.forRoutes.forEach((route: ControllerMetadata & { method: RequestMethod }) => {
this.setupRouteMiddleware(route, config, module, app);
});
this.setupMiddlewareConfig(config, module, app);
});
});
}
public static setupMiddlewareConfig(config: MiddlewareConfiguration, module: string, app: Application) {
const { forRoutes } = config;
forRoutes.forEach((route: ControllerMetadata & { method: RequestMethod }) => {
this.setupRouteMiddleware(route, config, module, app);
});
}
public static setupRouteMiddleware(
route: ControllerMetadata & { method: RequestMethod },
config: MiddlewareConfiguration,

View File

@@ -8,7 +8,7 @@ import { Logger } from '@nestjs/common/services/logger.service';
import { messages } from './constants';
import { MicroservicesModule } from '@nestjs/microservices/microservices-module';
import { Resolver } from './router/interfaces/resolver.interface';
import { INestApplication, INestMicroservice } from '@nestjs/common';
import { INestApplication, INestMicroservice, OnModuleInit } from '@nestjs/common';
import { ApplicationConfig } from './application-config';
import { validatePath, isNil, isUndefined } from '@nestjs/common/utils/shared.utils';
import { MicroserviceConfiguration } from '@nestjs/microservices';
@@ -50,6 +50,7 @@ export class NestApplication implements INestApplication {
validatePath(this.config.getGlobalPrefix()),
router,
);
this.callInitHook();
this.logger.log(messages.APPLICATION_READY);
this.isInitialized = true;
}
@@ -111,6 +112,25 @@ export class NestApplication implements INestApplication {
});
}
private callInitHook() {
const modules = this.container.getModules();
modules.forEach((module) => {
this.callModuleInitHook(module);
});
}
private callModuleInitHook(module: Module) {
const components = [...module.routes, ...module.components];
iterate(components).map(([key, {instance}]) => instance)
.filter((instance) => !isNil(instance))
.filter(this.hasOnModuleInitHook)
.forEach((instance) => (instance as OnModuleInit).onModuleInit());
}
private hasOnModuleInitHook(instance): instance is OnModuleInit {
return !isUndefined((instance as OnModuleInit).onModuleInit);
}
private callDestroyHook() {
const modules = this.container.getModules();
modules.forEach((module) => {

View File

@@ -5,8 +5,15 @@ import { ParamsTokenFactory } from './../pipes/params-token-factory';
export class PipesConsumer {
private readonly paramsTokenFactory = new ParamsTokenFactory();
public apply(value, metatype, type: RouteParamtypes, transforms: Transform<any>[]) {
public async apply(value, metatype, type: RouteParamtypes, transforms: Transform<any>[]) {
const token = this.paramsTokenFactory.exchangeEnumForString(type);
return transforms.reduce((val, fn) => fn(val, metatype, token), value);
return await transforms.reduce(async (defferedValue, fn) => {
const val = await defferedValue;
const result = fn(val, { metatype, type: token });
if (result instanceof Promise) {
return result;
}
return Promise.resolve(result);
}, Promise.resolve(value));
}
}

View File

@@ -19,12 +19,12 @@ export class PipesContextCreator extends ContextCreator {
if (isUndefined(metadata) || isEmpty(metadata)) {
return [];
}
return iterate(metadata).filter((pipe) => pipe.transform && isFunction(pipe.transform))
return iterate(metadata).filter((pipe) => pipe && pipe.transform && isFunction(pipe.transform))
.map((pipe) => pipe.transform.bind(pipe))
.toArray();
}
public getGlobalMetadata(): PipeTransform[] {
return [];
return this.config.getGlobalPipes();
}
}

View File

@@ -10,8 +10,13 @@ import { UnknownModuleException } from '../errors/exceptions/unknown-module.exce
import { ExceptionFilter } from '@nestjs/common/interfaces/exceptions/exception-filter.interface';
import { RouterProxyCallback } from './../router/router-proxy';
import { ContextCreator } from './../helpers/context-creator';
import { ApplicationConfig } from './../application-config';
export class RouterExceptionFilters extends ContextCreator {
constructor(private readonly config: ApplicationConfig) {
super();
}
public create(instance: Controller, callback: RouterProxyCallback): ExceptionsHandler {
const exceptionHandler = new ExceptionsHandler();
const filters = this.createContext(instance, callback, EXCEPTION_FILTERS_METADATA);
@@ -22,8 +27,8 @@ export class RouterExceptionFilters extends ContextCreator {
return exceptionHandler;
}
public getGlobalMetadata<T extends any[]>(): T {
return [] as T;
public getGlobalMetadata(): ExceptionFilter[] {
return this.config.getGlobalFilters();
}
public createConcreteContext(metadata: ExceptionFilter[]): ExceptionFilterMetadata[] {

View File

@@ -31,11 +31,12 @@ export class RouterExecutionContext {
const pipes = this.pipesContextCreator.create(instance, callback);
const paramtypes = this.reflectCallbackParamtypes(instance, callback);
return (req, res, next) => {
return async (req, res, next) => {
const paramProperties = this.exchangeKeysForValues(keys, metadata, { req, res, next });
paramProperties.forEach((param) => {
args[param.index] = this.getParamValue(param.value, paramtypes[param.index], param.type, pipes);
});
for (const param of paramProperties) {
const { index, value, type } = param;
args[index] = await this.getParamValue(value, paramtypes[index], type, pipes);
}
return callback.apply(instance, args);
};
}
@@ -76,13 +77,18 @@ export class RouterExecutionContext {
});
}
public getParamValue<T>(value: T, metatype, paramtype: RouteParamtypes, transforms: Transform<any>[]) {
public async getParamValue<T>(
value: T,
metatype,
paramtype: RouteParamtypes,
transforms: Transform<any>[]): Promise<any> {
if (paramtype === RouteParamtypes.BODY
|| paramtype === RouteParamtypes.QUERY
|| paramtype === RouteParamtypes.PARAM) {
return this.pipesConsumer.apply(value, metatype, paramtype, transforms);
return await this.pipesConsumer.apply(value, metatype, paramtype, transforms);
}
return value;
return Promise.resolve(value);
}
}

View File

@@ -14,14 +14,15 @@ import { ApplicationConfig } from './../application-config';
export class RoutesResolver implements Resolver {
private readonly logger = new Logger(RoutesResolver.name);
private readonly routerProxy = new RouterProxy();
private readonly routerExceptionsFilter = new RouterExceptionFilters();
private readonly routerExceptionsFilter: RouterExceptionFilters;
private readonly routerBuilder: RouterExplorer;
constructor(
private readonly container: NestContainer,
expressAdapter,
private readonly expressAdapter,
private readonly config: ApplicationConfig) {
this.routerExceptionsFilter = new RouterExceptionFilters(config);
this.routerBuilder = new ExpressRouterExplorer(
new MetadataScanner(),
this.routerProxy,

View File

@@ -0,0 +1,34 @@
import { expect } from 'chai';
import { ParamsTokenFactory } from './../../pipes/params-token-factory';
import { RouteParamtypes } from '../../../common/enums/route-paramtypes.enum';
describe('ParamsTokenFactory', () => {
let factory: ParamsTokenFactory;
beforeEach(() => {
factory = new ParamsTokenFactory();
});
describe('exchangeEnumForString', () => {
describe('when key is', () => {
describe(`RouteParamtypes.BODY`, () => {
it('should returns body object', () => {
expect(factory.exchangeEnumForString(RouteParamtypes.BODY)).to.be.eql('body');
});
});
describe(`RouteParamtypes.QUERY`, () => {
it('should returns query object', () => {
expect(factory.exchangeEnumForString(RouteParamtypes.QUERY)).to.be.eql('query');
});
});
describe(`RouteParamtypes.PARAM`, () => {
it('should returns params object', () => {
expect(factory.exchangeEnumForString(RouteParamtypes.PARAM)).to.be.eql('param');
});
});
describe('not available', () => {
it('should returns null', () => {
expect(factory.exchangeEnumForString(-1)).to.be.eql(null);
});
});
});
});
});

View File

@@ -0,0 +1,38 @@
import * as sinon from 'sinon';
import { expect } from 'chai';
import { PipesConsumer } from './../../pipes/pipes-consumer';
import { RouteParamtypes } from './../../../common/enums/route-paramtypes.enum';
describe('PipesConsumer', () => {
let consumer: PipesConsumer;
beforeEach(() => {
consumer = new PipesConsumer();
});
describe('apply', () => {
let value, metatype, type, stringifiedType, transforms;
beforeEach(() => {
value = 0;
metatype = {},
type = RouteParamtypes.QUERY;
stringifiedType = 'query';
transforms = [
sinon.stub().callsFake((val) => val + 1),
sinon.stub().callsFake((val) => Promise.resolve(val + 1)),
sinon.stub().callsFake((val) => val + 1),
];
});
it('should call all transform functions', (done) => {
consumer.apply(value, metatype, type, transforms).then(() => {
expect(transforms.reduce((prev, next) => prev && next.called, true)).to.be.true;
done();
});
});
it('should returns expected result', (done) => {
const expectedResult = 3;
consumer.apply(value, metatype, type, transforms).then((result) => {
expect(result).to.be.eql(expectedResult);
done();
});
});
});
});

View File

@@ -0,0 +1,29 @@
import * as sinon from 'sinon';
import { expect } from 'chai';
import { PipesContextCreator } from './../../pipes/pipes-context-creator';
describe('PipesContextCreator', () => {
let creator: PipesContextCreator;
beforeEach(() => {
creator = new PipesContextCreator(null);
});
describe('createConcreteContext', () => {
describe('when metadata is empty or undefined', () => {
it('should returns empty array', () => {
expect(creator.createConcreteContext(undefined)).to.be.deep.equal([]);
expect(creator.createConcreteContext([])).to.be.deep.equal([]);
});
});
describe('when metadata is not empty or undefined', () => {
const metadata = [
null,
{},
{ transform: () => ({}) },
];
it('should returns expected array', () => {
const transforms = creator.createConcreteContext(metadata as any);
expect(transforms).to.have.length(1);
});
});
});
});

View File

@@ -4,11 +4,11 @@ import { RouterExceptionFilters } from '../../router/router-exception-filters';
import { ExceptionFilters } from '../../../common/utils/decorators/exception-filters.decorator';
import { Catch } from '../../../common/utils/decorators/catch.decorator';
import { UnknownModuleException } from '../../errors/exceptions/unknown-module.exception';
import { ApplicationConfig } from '../../application-config';
describe('RouterExceptionFilters', () => {
let moduleName: string;
let exceptionFilter: RouterExceptionFilters;
let container: { getModules: sinon.SinonStub };
class CustomException {}
@Catch(CustomException)
@@ -17,66 +17,50 @@ describe('RouterExceptionFilters', () => {
}
beforeEach(() => {
container = {
getModules: sinon.stub(),
};
moduleName = 'Test';
exceptionFilter = new RouterExceptionFilters(container as any);
exceptionFilter = new RouterExceptionFilters(new ApplicationConfig());
});
describe('create', () => {
describe('when filters metadata is empty', () => {
class EmptyMetadata {}
beforeEach(() => {
sinon.stub(exceptionFilter, 'createContext').returns([]);
});
it('should returns plain ExceptionHandler object', () => {
const filter = exceptionFilter.create(new EmptyMetadata(), moduleName);
const filter = exceptionFilter.create(new EmptyMetadata(), () => ({}) as any);
expect((filter as any).filters).to.be.empty;
});
});
describe('when filters metadata is not empty', () => {
@ExceptionFilters(ExceptionFilter)
@ExceptionFilters(new ExceptionFilter())
class WithMetadata {}
beforeEach(() => {
sinon.stub(exceptionFilter, 'findExceptionsFilterInstance').returns({
catch: () => ({}),
});
});
it('should returns ExceptionHandler object with exception filters', () => {
const filter = exceptionFilter.create(new WithMetadata(), moduleName);
const filter = exceptionFilter.create(new WithMetadata(), () => ({}) as any);
expect((filter as any).filters).to.not.be.empty;
});
});
});
describe('reflectExceptionFilters', () => {
/*describe('reflectExceptionFilters', () => {
const filters = [ ExceptionFilter ];
@ExceptionFilters(...filters)
@ExceptionFilters(...filters as any)
class WithMetadata {}
it('should returns EXCEPTION_FILTERS_METADATA metadata', () => {
expect(
exceptionFilter.reflectExceptionFilters(new WithMetadata()),
).to.be.eql(filters);
});
});
describe('findExceptionsFilterInstance', () => {
beforeEach(() => {
container.getModules.returns({
has: (arg) => false,
});
});
it('should throws "UnknownModuleException" when module does not exists', () => {
expect(
() => exceptionFilter.findExceptionsFilterInstance(null, 'test'),
).to.throws(UnknownModuleException);
});
});
});*/
describe('reflectCatchExceptions', () => {
it('should returns FILTER_CATCH_EXCEPTIONS metadata', () => {
expect(
exceptionFilter.reflectCatchExceptions(ExceptionFilter),
exceptionFilter.reflectCatchExceptions(new ExceptionFilter()),
).to.be.eql([ CustomException ]);
});
});
describe('resolveFiltersMetatypes', () => {
describe('createConcreteContext', () => {
class InvalidFilter {}
const filters = [ ExceptionFilter, InvalidFilter, 'test' ];
const filters = [ new ExceptionFilter(), new InvalidFilter(), 'test' ];
beforeEach(() => {
sinon.stub(exceptionFilter, 'findExceptionsFilterInstance').onFirstCall().returns({
@@ -84,7 +68,7 @@ describe('RouterExceptionFilters', () => {
}).onSecondCall().returns({});
});
it('should returns expected exception filters metadata', () => {
const resolved = exceptionFilter.resolveFiltersMetatypes(filters as any, moduleName);
const resolved = exceptionFilter.createConcreteContext(filters as any);
expect(resolved).to.have.length(1);
expect(resolved[0].exceptionMetatypes).to.be.deep.equal([ CustomException ]);
expect(resolved[0].func).to.be.a('function');

View File

@@ -4,6 +4,9 @@ import { RouteParamtypes } from '../../../common/enums/route-paramtypes.enum';
import { RouterExecutionContext } from '../../router/router-execution-context';
import { RouteParamsMetadata, Request, Body } from '../../../index';
import { RouteParamsFactory } from '../../router/route-params-factory';
import { PipesContextCreator } from '../../pipes/pipes-context-creator';
import { PipesConsumer } from '../../pipes/pipes-consumer';
import { ApplicationConfig } from '../../application-config';
describe('RouterExecutionContext', () => {
let contextCreator: RouterExecutionContext;
@@ -11,6 +14,7 @@ describe('RouterExecutionContext', () => {
let applySpy: sinon.SinonSpy;
let bindSpy: sinon.SinonSpy;
let factory: RouteParamsFactory;
let consumer: PipesConsumer;
beforeEach(() => {
callback = {
@@ -19,8 +23,13 @@ describe('RouterExecutionContext', () => {
};
bindSpy = sinon.spy(callback, 'bind');
applySpy = sinon.spy(callback, 'apply');
factory = new RouteParamsFactory();
contextCreator = new RouterExecutionContext(factory);
consumer = new PipesConsumer();
contextCreator = new RouterExecutionContext(
factory, new PipesContextCreator(new ApplicationConfig()), consumer,
);
});
describe('create', () => {
describe('when callback metadata is undefined', () => {
@@ -41,6 +50,7 @@ describe('RouterExecutionContext', () => {
},
};
sinon.stub(contextCreator, 'reflectCallbackMetadata').returns(metadata);
sinon.stub(contextCreator, 'reflectCallbackParamtypes').returns([]);
});
describe('returns proxy function', () => {
let proxyContext;
@@ -66,20 +76,25 @@ describe('RouterExecutionContext', () => {
},
};
exchangeKeysForValuesSpy = sinon.spy(contextCreator, 'exchangeKeysForValues');
proxyContext(request, response, next);
});
it('should call "exchangeKeysForValues" with expected arguments', () => {
const keys = Object.keys(metadata);
it('should call "exchangeKeysForValues" with expected arguments', (done) => {
proxyContext(request, response, next).then(() => {
const keys = Object.keys(metadata);
expect(exchangeKeysForValuesSpy.called).to.be.true;
expect(
exchangeKeysForValuesSpy.calledWith(keys, metadata, { req: request, res: response, next }),
).to.be.true;
expect(exchangeKeysForValuesSpy.called).to.be.true;
expect(
exchangeKeysForValuesSpy.calledWith(keys, metadata, { req: request, res: response, next }),
).to.be.true;
done();
});
});
it('should apply expected context and arguments to callback', () => {
const args = [ next, null, request.body.test ];
expect(applySpy.called).to.be.true;
expect(applySpy.calledWith(instance, args)).to.be.true;
it('should apply expected context and arguments to callback', (done) => {
proxyContext(request, response, next).then(() => {
const args = [ next, null, request.body.test ];
expect(applySpy.called).to.be.true;
expect(applySpy.calledWith(instance, args)).to.be.true;
done();
});
});
});
});
@@ -140,10 +155,36 @@ describe('RouterExecutionContext', () => {
const keys = Object.keys(metadata);
const values = contextCreator.exchangeKeysForValues(keys, metadata, { res, req, next });
const expectedValues = [
{ index: 0, value: req },
{ index: 2, value: req.body },
{ index: 0, value: req, type: RouteParamtypes.REQUEST },
{ index: 2, value: req.body, type: RouteParamtypes.BODY },
];
expect(values).to.deep.equal(expectedValues);
});
});
describe('getParamValue', () => {
let consumerApplySpy: sinon.SinonSpy;
const value = 3, metatype = null, transforms = [];
beforeEach(() => {
consumerApplySpy = sinon.spy(consumer, 'apply');
});
describe('when paramtype is query, body or param', () => {
it('should call "consumer.apply" with expected arguments', () => {
contextCreator.getParamValue(value, metatype, RouteParamtypes.QUERY, transforms);
expect(consumerApplySpy.calledWith(value, metatype, RouteParamtypes.QUERY, transforms)).to.be.true;
contextCreator.getParamValue(value, metatype, RouteParamtypes.BODY, transforms);
expect(consumerApplySpy.calledWith(value, metatype, RouteParamtypes.BODY, transforms)).to.be.true;
contextCreator.getParamValue(value, metatype, RouteParamtypes.PARAM, transforms);
expect(consumerApplySpy.calledWith(value, metatype, RouteParamtypes.PARAM, transforms)).to.be.true;
});
});
describe('when paramtype is not query, body and param', () => {
it('should not call "consumer.apply"', () => {
contextCreator.getParamValue(value, metatype, RouteParamtypes.NEXT, transforms);
expect(consumerApplySpy.called).to.be.false;
});
});
});
});

View File

@@ -57,7 +57,7 @@ describe('RouterExplorer', () => {
{ path: 'test', requestMethod: RequestMethod.GET },
];
routerBuilder.applyPathsToRouterProxy(null, paths as any, null, '');
routerBuilder.applyPathsToRouterProxy(null, paths as any, null);
expect(bindStub.calledWith(null, paths[0], null)).to.be.true;
expect(bindStub.callCount).to.be.eql(paths.length);

View File

@@ -4,6 +4,7 @@ import { RoutesResolver } from '../../router/routes-resolver';
import { Controller } from '../../../common/utils/decorators/controller.decorator';
import { RequestMapping } from '../../../common/utils/decorators/request-mapping.decorator';
import { RequestMethod } from '../../../common/enums/request-method.enum';
import { ApplicationConfig } from '../../application-config';
describe('RoutesResolver', () => {
@Controller({ path: 'global' })
@@ -28,11 +29,10 @@ describe('RoutesResolver', () => {
beforeEach(() => {
routesResolver = new RoutesResolver(null, {
createRouter: () => router,
});
}, new ApplicationConfig());
});
describe('setupRouters', () => {
it('should method setup controllers to express application instance', () => {
const routes = new Map();
routes.set('TestRoute', {

View File

@@ -3,9 +3,10 @@ import { ClientRedis } from './client-redis';
import { ClientMetadata } from '../interfaces/client-metadata.interface';
import { Transport } from '../enums/transport.enum';
import { ClientProxy } from './client-proxy';
import { Closeable } from '../interfaces/closeable.interface';
export class ClientProxyFactory {
public static create(metadata: ClientMetadata): ClientProxy {
public static create(metadata: ClientMetadata): ClientProxy & Closeable {
const { transport } = metadata;
switch (transport) {

View File

@@ -19,10 +19,12 @@ export class ClientRedis extends ClientProxy {
const { url } = metadata;
this.url = url || DEFAULT_URL;
this.init();
}
public sendSingleMessage(msg, callback: (...args) => any) {
if (!this.pub || !this.sub) {
this.init();
}
const pattern = JSON.stringify(msg.pattern);
const responseCallback = (channel, message) => {
const { err, response, disposed } = JSON.parse(message);
@@ -49,7 +51,12 @@ export class ClientRedis extends ClientProxy {
return `${pattern}_res`;
}
private init() {
public close() {
this.pub && this.pub.quit();
this.sub && this.sub.quit();
}
public init() {
this.pub = this.createClient();
this.sub = this.createClient();
@@ -57,11 +64,11 @@ export class ClientRedis extends ClientProxy {
this.handleErrors(this.sub);
}
private createClient(): redis.RedisClient {
public createClient(): redis.RedisClient {
return redis.createClient({ url: this.url });
}
private handleErrors(stream) {
public handleErrors(stream) {
stream.on(ERROR_EVENT, (err) => this.logger.error(err));
}
}

View File

@@ -2,15 +2,21 @@ import * as net from 'net';
import * as JsonSocket from 'json-socket';
import { ClientProxy } from './client-proxy';
import { ClientMetadata } from '../interfaces/client-metadata.interface';
import { Logger } from '@nestjs/common';
const DEFAULT_PORT = 3000;
const DEFAULT_HOST = 'localhost';
const CONNECT_EVENT = 'connect';
const MESSAGE_EVENT = 'message';
const ERROR_EVENT = 'error';
const CLOSE_EVENT = 'close';
export class ClientTCP extends ClientProxy {
private readonly logger = new Logger(ClientTCP.name);
private readonly port: number;
private readonly host: string;
private isConnected = false;
private socket;
constructor({ port, host }: ClientMetadata) {
super();
@@ -18,13 +24,29 @@ export class ClientTCP extends ClientProxy {
this.host = host || DEFAULT_HOST;
}
public sendSingleMessage(msg, callback: (...args) => any) {
const socket = this.createSocket();
socket.connect(this.port, this.host);
socket.on(CONNECT_EVENT, () => {
public init(): Promise<{}> {
this.socket = this.createSocket();
return new Promise((resolve) => {
this.socket.on(CONNECT_EVENT, () => {
this.isConnected = true;
this.bindEvents(this.socket);
resolve(this.socket);
});
this.socket.connect(this.port, this.host);
});
}
public async sendSingleMessage(msg, callback: (...args) => any) {
const sendMessage = (socket) => {
socket.sendMessage(msg);
socket.on(MESSAGE_EVENT, (buffer) => this.handleResponse(socket, callback, buffer));
});
};
if (this.isConnected) {
sendMessage(this.socket);
return Promise.resolve();
}
const socket = await this.init();
sendMessage(socket);
}
public handleResponse(socket, callback: (...args) => any, buffer) {
@@ -40,4 +62,21 @@ export class ClientTCP extends ClientProxy {
public createSocket() {
return new JsonSocket(new net.Socket());
}
public close() {
if (!this.socket) {
return;
}
this.socket.close();
this.isConnected = false;
this.socket = null;
}
public bindEvents(socket) {
socket.on(ERROR_EVENT, (err) => this.logger.error(err));
socket.on(CLOSE_EVENT, () => {
this.isConnected = false;
this.socket = null;
});
}
}

View File

@@ -0,0 +1,20 @@
import { ClientProxy } from './index';
import { Closeable } from './interfaces/closeable.interface';
export type CloseableClient = Closeable & ClientProxy;
export class ClientsContainer {
private clients: CloseableClient[] = [];
public getAllClients(): CloseableClient[] {
return this.clients;
}
public addClient(client: CloseableClient) {
this.clients.push(client);
}
public clear() {
this.clients = [];
}
}

View File

@@ -0,0 +1,3 @@
export interface Closeable {
close(): void;
}

View File

@@ -5,10 +5,13 @@ import { Server } from './server/server';
import { ClientProxyFactory } from './client/client-proxy-factory';
import { MetadataScanner } from '@nestjs/core/metadata-scanner';
import { CustomTransportStrategy } from './interfaces';
import { ClientsContainer } from './container';
export class ListenersController {
private readonly metadataExplorer = new ListenerMetadataExplorer(new MetadataScanner());
constructor(private readonly clientsContainer: ClientsContainer) {}
public bindPatternHandlers(instance: Controller, server: Server & CustomTransportStrategy) {
const patternHandlers = this.metadataExplorer.explore(instance);
patternHandlers.forEach(({ pattern, targetCallback }) => server.add(pattern, targetCallback));
@@ -16,7 +19,10 @@ export class ListenersController {
public bindClientsToProperties(instance: Controller) {
for (const { property, metadata } of this.metadataExplorer.scanForClientHooks(instance)) {
Reflect.set(instance, property, ClientProxyFactory.create(metadata));
const client = ClientProxyFactory.create(metadata);
this.clientsContainer.addClient(client);
Reflect.set(instance, property, client);
}
}
}

View File

@@ -3,9 +3,13 @@ import { Controller } from '@nestjs/common/interfaces/controllers/controller.int
import { ListenersController } from './listeners-controller';
import { CustomTransportStrategy } from './interfaces';
import { Server } from './server/server';
import { ClientsContainer } from './container';
export class MicroservicesModule {
private static readonly listenersController = new ListenersController();
private static readonly clientsContainer = new ClientsContainer();
private static readonly listenersController = new ListenersController(
MicroservicesModule.clientsContainer,
);
public static setupListeners(container, server: Server & CustomTransportStrategy) {
const modules = container.getModules();
@@ -34,4 +38,10 @@ export class MicroservicesModule {
!isNotMetatype && this.listenersController.bindClientsToProperties(instance);
});
}
public static close() {
const clients = this.clientsContainer.getAllClients();
clients.forEach((client) => client.close());
this.clientsContainer.clear();
}
}

View File

@@ -26,6 +26,7 @@ describe('ClientRedis', () => {
onSpy: sinon.SinonSpy,
removeListenerSpy: sinon.SinonSpy,
unsubscribeSpy: sinon.SinonSpy,
initSpy: sinon.SinonSpy,
sub,
pub;
@@ -45,6 +46,20 @@ describe('ClientRedis', () => {
pub = { publish: publishSpy };
(client as any).sub = sub;
(client as any).pub = pub;
initSpy = sinon.spy(client, 'init');
});
afterEach(() => {
initSpy.restore();
});
it('should not call "init()" when pub and sub are null', () => {
client.sendSingleMessage(msg, () => {});
expect(initSpy.called).to.be.false;
});
it('should call "init()" when pub and sub are null', () => {
(client as any).sub = null;
(client as any).pub = null;
client.sendSingleMessage(msg, () => {});
expect(initSpy.called).to.be.true;
});
it('should subscribe to response pattern name', () => {
client.sendSingleMessage(msg, () => {});
@@ -100,4 +115,55 @@ describe('ClientRedis', () => {
});
});
});
describe('close', () => {
let pubClose: sinon.SinonSpy;
let subClose: sinon.SinonSpy;
let pub, sub;
beforeEach(() => {
pubClose = sinon.spy();
subClose = sinon.spy();
pub = { quit: pubClose };
sub = { quit: subClose };
(client as any).pub = pub;
(client as any).sub = sub;
});
it('should close "pub" when it is not null', () => {
client.close();
expect(pubClose.called).to.be.true;
});
it('should not close "pub" when it is null', () => {
(client as any).pub = null;
client.close();
expect(pubClose.called).to.be.false;
});
it('should close "sub" when it is not null', () => {
client.close();
expect(subClose.called).to.be.true;
});
it('should not close "sub" when it is null', () => {
(client as any).sub = null;
client.close();
expect(subClose.called).to.be.false;
});
});
describe('init', () => {
let createClientSpy: sinon.SinonSpy;
let handleErrorsSpy: sinon.SinonSpy;
beforeEach(() => {
createClientSpy = sinon.spy(client, 'createClient');
handleErrorsSpy = sinon.spy(client, 'handleErrors');
client.init();
});
afterEach(() => {
createClientSpy.restore();
handleErrorsSpy.restore();
});
it('should call "createClient" twice', () => {
expect(createClientSpy.calledTwice).to.be.true;
});
it('should call "handleErrors" twice', () => {
expect(handleErrorsSpy.calledTwice).to.be.true;
});
});
});

View File

@@ -16,7 +16,7 @@ describe('ClientTCP', () => {
socket = {
connect: sinon.spy(),
sendMessage: sinon.spy(),
on: sinon.stub().callsFake((event, callback) => callback({})),
on: sinon.stub().callsFake((event, callback) => event !== 'error' && event !== 'close' && callback({})),
close: sinon.spy(),
};
createSocketStub = sinon.stub(client, 'createSocket').callsFake(() => socket);
@@ -28,18 +28,33 @@ describe('ClientTCP', () => {
let msg;
beforeEach(() => {
msg = { test: 3 };
client.sendSingleMessage(msg, () => ({}));
});
it('should connect to server', () => {
expect(socket.connect.called).to.be.true;
it('should connect to server when is not connected', (done) => {
client.sendSingleMessage(msg, () => ({})).then(() => {
expect(socket.connect.calledOnce).to.be.true;
done();
});
});
it('should not connect to server when is already connected', () => {
(client as any).isConnected = true;
client.sendSingleMessage(msg, () => ({}));
expect(socket.connect.called).to.be.false;
});
describe('after connection', () => {
it('should send message', () => {
expect(socket.sendMessage.called).to.be.true;
expect(socket.sendMessage.calledWith(msg)).to.be.true;
it('should send message', (done) => {
(client as any).isConnected = false;
client.sendSingleMessage(msg, () => ({})).then(() => {
expect(socket.sendMessage.called).to.be.true;
expect(socket.sendMessage.calledWith(msg)).to.be.true;
done();
});
});
it('should listen on messages', () => {
expect(socket.on.called).to.be.true;
it('should listen on messages', (done) => {
(client as any).isConnected = false;
client.sendSingleMessage(msg, () => ({})).then(() => {
expect(socket.on.called).to.be.true;
done();
});
});
});
});
@@ -74,4 +89,20 @@ describe('ClientTCP', () => {
});
});
});
describe('close', () => {
beforeEach(() => {
(client as any).socket = socket;
(client as any).isConnected = true;
client.close();
});
it('should close() socket', () => {
expect(socket.close.called).to.be.true;
});
it('should set "isConnected" to false', () => {
expect((client as any).isConnected).to.be.false;
});
it('should set "socket" to null', () => {
expect((client as any).socket).to.be.null;
});
});
});

View File

@@ -0,0 +1,32 @@
import * as sinon from 'sinon';
import { expect } from 'chai';
import { ClientsContainer } from '../container';
describe('ClientsContainer', () => {
let instance: ClientsContainer;
beforeEach(() => {
instance = new ClientsContainer();
});
describe('getAllClients', () => {
it('should returns array of clients', () => {
const clients = [ 1, 2, 3 ];
(instance as any).clients = clients;
expect(instance.getAllClients()).to.be.eql(clients);
});
});
describe('addClient', () => {
it('should push client into clients array', () => {
const client = 'test';
instance.addClient(client as any);
expect(instance.getAllClients()).to.be.deep.equal([client]);
});
});
describe('clear', () => {
it('should remove all clients', () => {
const clients = [ 1, 2, 3 ];
(instance as any).clients = clients;
instance.clear();
expect(instance.getAllClients()).to.be.deep.equal([]);
});
});
});

View File

@@ -3,6 +3,7 @@ import { expect } from 'chai';
import { ListenersController } from '../listeners-controller';
import { ListenerMetadataExplorer } from '../listener-metadata-explorer';
import { MetadataScanner } from '../../core/metadata-scanner';
import { ClientsContainer } from '../container';
describe('ListenersController', () => {
let instance: ListenersController,
@@ -16,7 +17,7 @@ describe('ListenersController', () => {
explorer = sinon.mock(metadataExplorer);
});
beforeEach(() => {
instance = new ListenersController();
instance = new ListenersController(new ClientsContainer());
(instance as any).metadataExplorer = metadataExplorer;
addSpy = sinon.spy();
server = {

View File

@@ -20,4 +20,8 @@ export class SocketsContainer {
port,
}, server);
}
public clear() {
this.observableServers.clear();
}
}

View File

@@ -41,5 +41,6 @@ export class SocketModule {
public static close() {
const servers = this.socketsContainer.getAllServers();
servers.forEach(({ server }) => server.close());
this.socketsContainer.clear();
}
}