mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
refactor(nestjs) microservices improvements, reduce memory usage
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { INestApplication } from '@nestjs/common';
|
||||
import { NestFactory } from '@nestjs/core';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import * as request from 'supertest';
|
||||
import { ApplicationModule } from './../src/app.module';
|
||||
|
||||
@@ -7,7 +7,11 @@ describe('GraphQL', () => {
|
||||
let app: INestApplication;
|
||||
|
||||
beforeEach(async () => {
|
||||
app = await NestFactory.create(ApplicationModule, { logger: false });
|
||||
const module = await Test.createTestingModule({
|
||||
imports: [ApplicationModule],
|
||||
}).compile();
|
||||
|
||||
app = module.createNestApplication();
|
||||
await app.init();
|
||||
});
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import * as express from 'express';
|
||||
import * as request from 'supertest';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import { INestApplication } from '@nestjs/common';
|
||||
import { Transport } from '@nestjs/microservices';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import * as express from 'express';
|
||||
import * as request from 'supertest';
|
||||
import { MqttController } from '../src/mqtt/mqtt.controller';
|
||||
|
||||
describe('MQTT transport', () => {
|
||||
@@ -49,11 +49,16 @@ describe('MQTT transport', () => {
|
||||
return request(server)
|
||||
.post('/concurrent')
|
||||
.send([
|
||||
[1, 2, 3, 4, 5],
|
||||
[6, 7, 8, 9, 10],
|
||||
[11, 12, 13, 14, 15],
|
||||
[16, 17, 18, 19, 20],
|
||||
[21, 22, 23, 24, 25],
|
||||
Array.from({ length: 10 }, (v, k) => k + 1),
|
||||
Array.from({ length: 10 }, (v, k) => k + 11),
|
||||
Array.from({ length: 10 }, (v, k) => k + 21),
|
||||
Array.from({ length: 10 }, (v, k) => k + 31),
|
||||
Array.from({ length: 10 }, (v, k) => k + 41),
|
||||
Array.from({ length: 10 }, (v, k) => k + 51),
|
||||
Array.from({ length: 10 }, (v, k) => k + 61),
|
||||
Array.from({ length: 10 }, (v, k) => k + 71),
|
||||
Array.from({ length: 10 }, (v, k) => k + 81),
|
||||
Array.from({ length: 10 }, (v, k) => k + 91),
|
||||
])
|
||||
.expect(200, 'true');
|
||||
});
|
||||
|
||||
@@ -55,6 +55,24 @@ describe('NATS transport', () => {
|
||||
.expect(200, '15');
|
||||
});
|
||||
|
||||
it(`/POST (concurrent)`, () => {
|
||||
return request(server)
|
||||
.post('/concurrent')
|
||||
.send([
|
||||
Array.from({ length: 10 }, (v, k) => k + 1),
|
||||
Array.from({ length: 10 }, (v, k) => k + 11),
|
||||
Array.from({ length: 10 }, (v, k) => k + 21),
|
||||
Array.from({ length: 10 }, (v, k) => k + 31),
|
||||
Array.from({ length: 10 }, (v, k) => k + 41),
|
||||
Array.from({ length: 10 }, (v, k) => k + 51),
|
||||
Array.from({ length: 10 }, (v, k) => k + 61),
|
||||
Array.from({ length: 10 }, (v, k) => k + 71),
|
||||
Array.from({ length: 10 }, (v, k) => k + 81),
|
||||
Array.from({ length: 10 }, (v, k) => k + 91),
|
||||
])
|
||||
.expect(200, 'true');
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await app.close();
|
||||
});
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import * as express from 'express';
|
||||
import * as request from 'supertest';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import { INestApplication } from '@nestjs/common';
|
||||
import { Transport } from '@nestjs/microservices';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import * as express from 'express';
|
||||
import * as request from 'supertest';
|
||||
import { RedisController } from '../src/redis/redis.controller';
|
||||
|
||||
describe('REDIS transport', () => {
|
||||
@@ -49,11 +49,16 @@ describe('REDIS transport', () => {
|
||||
return request(server)
|
||||
.post('/concurrent')
|
||||
.send([
|
||||
[1, 2, 3, 4, 5],
|
||||
[6, 7, 8, 9, 10],
|
||||
[11, 12, 13, 14, 15],
|
||||
[16, 17, 18, 19, 20],
|
||||
[21, 22, 23, 24, 25],
|
||||
Array.from({ length: 10 }, (v, k) => k + 1),
|
||||
Array.from({ length: 10 }, (v, k) => k + 11),
|
||||
Array.from({ length: 10 }, (v, k) => k + 21),
|
||||
Array.from({ length: 10 }, (v, k) => k + 31),
|
||||
Array.from({ length: 10 }, (v, k) => k + 41),
|
||||
Array.from({ length: 10 }, (v, k) => k + 51),
|
||||
Array.from({ length: 10 }, (v, k) => k + 61),
|
||||
Array.from({ length: 10 }, (v, k) => k + 71),
|
||||
Array.from({ length: 10 }, (v, k) => k + 81),
|
||||
Array.from({ length: 10 }, (v, k) => k + 91),
|
||||
])
|
||||
.expect(200, 'true');
|
||||
});
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { INestApplication } from '@nestjs/common';
|
||||
import { Transport } from '@nestjs/microservices';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import * as express from 'express';
|
||||
import * as request from 'supertest';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import { INestApplication } from '@nestjs/common';
|
||||
import { ApplicationModule } from './../src/app.module';
|
||||
import { Transport } from '@nestjs/microservices';
|
||||
|
||||
describe('RPC transport', () => {
|
||||
let server;
|
||||
@@ -49,11 +49,16 @@ describe('RPC transport', () => {
|
||||
return request(server)
|
||||
.post('/concurrent')
|
||||
.send([
|
||||
[1, 2, 3, 4, 5],
|
||||
[6, 7, 8, 9, 10],
|
||||
[11, 12, 13, 14, 15],
|
||||
[16, 17, 18, 19, 20],
|
||||
[21, 22, 23, 24, 25],
|
||||
Array.from({ length: 10 }, (v, k) => k + 1),
|
||||
Array.from({ length: 10 }, (v, k) => k + 11),
|
||||
Array.from({ length: 10 }, (v, k) => k + 21),
|
||||
Array.from({ length: 10 }, (v, k) => k + 31),
|
||||
Array.from({ length: 10 }, (v, k) => k + 41),
|
||||
Array.from({ length: 10 }, (v, k) => k + 51),
|
||||
Array.from({ length: 10 }, (v, k) => k + 61),
|
||||
Array.from({ length: 10 }, (v, k) => k + 71),
|
||||
Array.from({ length: 10 }, (v, k) => k + 81),
|
||||
Array.from({ length: 10 }, (v, k) => k + 91),
|
||||
])
|
||||
.expect(200, 'true');
|
||||
});
|
||||
|
||||
@@ -33,7 +33,7 @@ export class MqttController {
|
||||
|
||||
@Post('concurrent')
|
||||
@HttpCode(200)
|
||||
concurrent(@Body() data: number[][]): Promise<boolean> {
|
||||
async concurrent(@Body() data: number[][]): Promise<boolean> {
|
||||
const send = async (tab: number[]) => {
|
||||
const expected = tab.reduce((a, b) => a + b);
|
||||
const result = await this.client
|
||||
@@ -42,7 +42,7 @@ export class MqttController {
|
||||
|
||||
return result === expected;
|
||||
};
|
||||
return data
|
||||
return await data
|
||||
.map(async tab => await send(tab))
|
||||
.reduce(async (a, b) => (await a) && (await b));
|
||||
}
|
||||
|
||||
13
packages/core/helpers/application-ref-host.ts
Normal file
13
packages/core/helpers/application-ref-host.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { HttpServer } from '@nestjs/common';
|
||||
|
||||
export class ApplicationReferenceHost {
|
||||
private _applicationRef: HttpServer | any;
|
||||
|
||||
set applicationRef(applicationRef: any) {
|
||||
this._applicationRef = applicationRef;
|
||||
}
|
||||
|
||||
get applicationRef(): HttpServer | any {
|
||||
return this._applicationRef;
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import 'reflect-metadata';
|
||||
import { FORBIDDEN_MESSAGE } from '../guards/constants';
|
||||
import { GuardsConsumer } from '../guards/guards-consumer';
|
||||
import { GuardsContextCreator } from '../guards/guards-context-creator';
|
||||
import { NestContainer } from '../injector/container';
|
||||
import { Module } from '../injector/module';
|
||||
import { ModulesContainer } from '../injector/modules-container';
|
||||
import { InterceptorsConsumer } from '../interceptors/interceptors-consumer';
|
||||
@@ -38,6 +39,18 @@ export class ExternalContextCreator {
|
||||
private readonly pipesConsumer: PipesConsumer,
|
||||
) {}
|
||||
|
||||
static fromContainer(container: NestContainer): ExternalContextCreator {
|
||||
return new ExternalContextCreator(
|
||||
new GuardsContextCreator(container, container.applicationConfig),
|
||||
new GuardsConsumer(),
|
||||
new InterceptorsContextCreator(container, container.applicationConfig),
|
||||
new InterceptorsConsumer(),
|
||||
container.getModules(),
|
||||
new PipesContextCreator(container, container.applicationConfig),
|
||||
new PipesConsumer(),
|
||||
);
|
||||
}
|
||||
|
||||
public create<T extends ParamsMetadata = ParamsMetadata>(
|
||||
instance: Controller,
|
||||
callback: (...args) => any,
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
export * from './adapters';
|
||||
export { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from './constants';
|
||||
export { BaseExceptionFilter } from './exceptions/base-exception-filter';
|
||||
export { ApplicationReferenceHost } from './helpers/application-ref-host';
|
||||
export { ModuleRef } from './injector/module-ref';
|
||||
export { HTTP_SERVER_REF } from './injector/tokens';
|
||||
export { MiddlewareBuilder } from './middleware/builder';
|
||||
|
||||
@@ -5,6 +5,9 @@ import 'reflect-metadata';
|
||||
import { ApplicationConfig } from '../application-config';
|
||||
import { InvalidModuleException } from '../errors/exceptions/invalid-module.exception';
|
||||
import { UnknownModuleException } from '../errors/exceptions/unknown-module.exception';
|
||||
import { ApplicationReferenceHost } from '../helpers/application-ref-host';
|
||||
import { ExternalContextCreator } from '../helpers/external-context-creator';
|
||||
import { Reflector } from '../services';
|
||||
import { ModuleCompiler } from './compiler';
|
||||
import { Module } from './module';
|
||||
import { ModulesContainer } from './modules-container';
|
||||
@@ -17,6 +20,10 @@ export class NestContainer {
|
||||
string,
|
||||
Partial<DynamicModule>
|
||||
>();
|
||||
private readonly reflector = new Reflector();
|
||||
private readonly applicationRefHost = new ApplicationReferenceHost();
|
||||
private externalContextCreator: ExternalContextCreator;
|
||||
private modulesContainer: ModulesContainer;
|
||||
private applicationRef: any;
|
||||
|
||||
constructor(
|
||||
@@ -29,6 +36,11 @@ export class NestContainer {
|
||||
|
||||
public setApplicationRef(applicationRef: any) {
|
||||
this.applicationRef = applicationRef;
|
||||
|
||||
if (!this.applicationRefHost) {
|
||||
return;
|
||||
}
|
||||
this.applicationRefHost.applicationRef = applicationRef;
|
||||
}
|
||||
|
||||
public getApplicationRef() {
|
||||
@@ -177,6 +189,28 @@ export class NestContainer {
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
public getReflector(): Reflector {
|
||||
return this.reflector;
|
||||
}
|
||||
|
||||
public getExternalContextCreator(): ExternalContextCreator {
|
||||
if (!this.externalContextCreator) {
|
||||
this.externalContextCreator = ExternalContextCreator.fromContainer(this);
|
||||
}
|
||||
return this.externalContextCreator;
|
||||
}
|
||||
|
||||
public getApplicationRefHost(): ApplicationReferenceHost {
|
||||
return this.applicationRefHost;
|
||||
}
|
||||
|
||||
public getModulesContainer(): ModulesContainer {
|
||||
if (!this.modulesContainer) {
|
||||
this.modulesContainer = this.getModules();
|
||||
}
|
||||
return this.modulesContainer;
|
||||
}
|
||||
}
|
||||
|
||||
export interface InstanceWrapper<T> {
|
||||
|
||||
@@ -15,13 +15,8 @@ import {
|
||||
} from '@nestjs/common/utils/shared.utils';
|
||||
import { RuntimeException } from '../errors/exceptions/runtime.exception';
|
||||
import { UnknownExportException } from '../errors/exceptions/unknown-export.exception';
|
||||
import { GuardsConsumer } from '../guards/guards-consumer';
|
||||
import { GuardsContextCreator } from '../guards/guards-context-creator';
|
||||
import { ApplicationReferenceHost } from '../helpers/application-ref-host';
|
||||
import { ExternalContextCreator } from '../helpers/external-context-creator';
|
||||
import { InterceptorsConsumer } from '../interceptors/interceptors-consumer';
|
||||
import { InterceptorsContextCreator } from '../interceptors/interceptors-context-creator';
|
||||
import { PipesConsumer } from '../pipes/pipes-consumer';
|
||||
import { PipesContextCreator } from '../pipes/pipes-context-creator';
|
||||
import { Reflector } from '../services/reflector.service';
|
||||
import { InstanceWrapper, NestContainer } from './container';
|
||||
import { ModuleRef } from './module-ref';
|
||||
@@ -105,10 +100,11 @@ export class Module {
|
||||
public addCoreInjectables(container: NestContainer) {
|
||||
this.addModuleAsComponent();
|
||||
this.addModuleRef();
|
||||
this.addReflector();
|
||||
this.addReflector(container.getReflector());
|
||||
this.addApplicationRef(container.getApplicationRef());
|
||||
this.addExternalContextCreator(container);
|
||||
this.addModulesContainer(container);
|
||||
this.addExternalContextCreator(container.getExternalContextCreator());
|
||||
this.addModulesContainer(container.getModulesContainer());
|
||||
this.addApplicationRefHost(container.getApplicationRefHost());
|
||||
}
|
||||
|
||||
public addModuleRef() {
|
||||
@@ -130,12 +126,12 @@ export class Module {
|
||||
});
|
||||
}
|
||||
|
||||
public addReflector() {
|
||||
public addReflector(reflector: Reflector) {
|
||||
this._components.set(Reflector.name, {
|
||||
name: Reflector.name,
|
||||
metatype: Reflector,
|
||||
isResolved: false,
|
||||
instance: null,
|
||||
isResolved: true,
|
||||
instance: reflector,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -148,29 +144,32 @@ export class Module {
|
||||
});
|
||||
}
|
||||
|
||||
public addExternalContextCreator(container: NestContainer) {
|
||||
public addExternalContextCreator(
|
||||
externalContextCreator: ExternalContextCreator,
|
||||
) {
|
||||
this._components.set(ExternalContextCreator.name, {
|
||||
name: ExternalContextCreator.name,
|
||||
metatype: ExternalContextCreator,
|
||||
isResolved: true,
|
||||
instance: new ExternalContextCreator(
|
||||
new GuardsContextCreator(container, container.applicationConfig),
|
||||
new GuardsConsumer(),
|
||||
new InterceptorsContextCreator(container, container.applicationConfig),
|
||||
new InterceptorsConsumer(),
|
||||
container.getModules(),
|
||||
new PipesContextCreator(container, container.applicationConfig),
|
||||
new PipesConsumer(),
|
||||
),
|
||||
instance: externalContextCreator,
|
||||
});
|
||||
}
|
||||
|
||||
public addModulesContainer(container: NestContainer) {
|
||||
public addModulesContainer(modulesContainer: ModulesContainer) {
|
||||
this._components.set(ModulesContainer.name, {
|
||||
name: ModulesContainer.name,
|
||||
metatype: ModulesContainer,
|
||||
isResolved: true,
|
||||
instance: container.getModules(),
|
||||
instance: modulesContainer,
|
||||
});
|
||||
}
|
||||
|
||||
public addApplicationRefHost(applicationRefHost: ApplicationReferenceHost) {
|
||||
this._components.set(ApplicationReferenceHost.name, {
|
||||
name: ApplicationReferenceHost.name,
|
||||
metatype: ApplicationReferenceHost,
|
||||
isResolved: true,
|
||||
instance: applicationRefHost,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { Logger } from '@nestjs/common/services/logger.service';
|
||||
import { loadPackage } from '@nestjs/common/utils/load-package.util';
|
||||
import { fromEvent, merge, Observable } from 'rxjs';
|
||||
import { first, map, share } from 'rxjs/operators';
|
||||
import { first, map, share, tap } from 'rxjs/operators';
|
||||
import {
|
||||
CLOSE_EVENT,
|
||||
ERROR_EVENT,
|
||||
@@ -52,7 +52,12 @@ export class ClientMqtt extends ClientProxy {
|
||||
|
||||
const connect$ = this.connect$(this.mqttClient);
|
||||
this.connection = this.mergeCloseEvent(this.mqttClient, connect$)
|
||||
.pipe(share())
|
||||
.pipe(
|
||||
tap(() =>
|
||||
this.mqttClient.on(MESSAGE_EVENT, this.createResponseCallback()),
|
||||
),
|
||||
share(),
|
||||
)
|
||||
.toPromise();
|
||||
return this.connection;
|
||||
}
|
||||
@@ -80,15 +85,14 @@ export class ClientMqtt extends ClientProxy {
|
||||
);
|
||||
}
|
||||
|
||||
public createResponseCallback(
|
||||
packet: ReadPacket & PacketId,
|
||||
callback: (packet: WritePacket) => any,
|
||||
): (channel: string, buffer) => any {
|
||||
public createResponseCallback(): (channel: string, buffer) => any {
|
||||
return (channel: string, buffer: Buffer) => {
|
||||
const { err, response, isDisposed, id } = JSON.parse(
|
||||
buffer.toString(),
|
||||
) as WritePacket & PacketId;
|
||||
if (id !== packet.id) {
|
||||
|
||||
const callback = this.routingMap.get(id);
|
||||
if (!callback) {
|
||||
return undefined;
|
||||
}
|
||||
if (isDisposed || err) {
|
||||
@@ -113,17 +117,20 @@ export class ClientMqtt extends ClientProxy {
|
||||
const packet = this.assignPacketId(partialPacket);
|
||||
const pattern = this.normalizePattern(partialPacket.pattern);
|
||||
const responseChannel = this.getResPatternName(pattern);
|
||||
const responseCallback = this.createResponseCallback(packet, callback);
|
||||
|
||||
this.mqttClient.on(MESSAGE_EVENT, responseCallback);
|
||||
this.mqttClient.subscribe(responseChannel);
|
||||
this.mqttClient.subscribe(responseChannel, err => {
|
||||
if (err) {
|
||||
return;
|
||||
}
|
||||
this.routingMap.set(packet.id, callback);
|
||||
this.mqttClient.publish(
|
||||
this.getAckPatternName(pattern),
|
||||
JSON.stringify(packet),
|
||||
);
|
||||
});
|
||||
return () => {
|
||||
this.mqttClient.unsubscribe(responseChannel);
|
||||
this.mqttClient.removeListener(MESSAGE_EVENT, responseCallback);
|
||||
this.routingMap.delete(packet.id);
|
||||
};
|
||||
} catch (err) {
|
||||
callback({ err });
|
||||
|
||||
@@ -21,6 +21,7 @@ import {
|
||||
export abstract class ClientProxy {
|
||||
public abstract connect(): Promise<any>;
|
||||
public abstract close(): any;
|
||||
protected routingMap = new Map<string, Function>();
|
||||
|
||||
public send<TResult = any, TInput = any>(
|
||||
pattern: any,
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import { Logger } from '@nestjs/common/services/logger.service';
|
||||
import { loadPackage } from '@nestjs/common/utils/load-package.util';
|
||||
import { fromEvent, merge, Subject, zip } from 'rxjs';
|
||||
import { take } from 'rxjs/operators';
|
||||
import { share, take, tap } from 'rxjs/operators';
|
||||
import {
|
||||
CONNECT_EVENT,
|
||||
ERROR_EVENT,
|
||||
MESSAGE_EVENT,
|
||||
REDIS_DEFAULT_URL,
|
||||
SUBSCRIBE,
|
||||
} from '../constants';
|
||||
import {
|
||||
ClientOpts,
|
||||
@@ -26,6 +25,7 @@ export class ClientRedis extends ClientProxy {
|
||||
protected readonly url: string;
|
||||
protected pubClient: RedisClient;
|
||||
protected subClient: RedisClient;
|
||||
protected connection: Promise<any>;
|
||||
private isExplicitlyTerminated = false;
|
||||
|
||||
constructor(protected readonly options: ClientOptions['options']) {
|
||||
@@ -52,9 +52,8 @@ export class ClientRedis extends ClientProxy {
|
||||
|
||||
public connect(): Promise<any> {
|
||||
if (this.pubClient && this.subClient) {
|
||||
return Promise.resolve();
|
||||
return this.connection;
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
const error$ = new Subject<Error>();
|
||||
|
||||
this.pubClient = this.createClient(error$);
|
||||
@@ -65,10 +64,16 @@ export class ClientRedis extends ClientProxy {
|
||||
const pubConnect$ = fromEvent(this.pubClient, CONNECT_EVENT);
|
||||
const subClient$ = fromEvent(this.subClient, CONNECT_EVENT);
|
||||
|
||||
merge(error$, zip(pubConnect$, subClient$))
|
||||
.pipe(take(1))
|
||||
.subscribe(resolve, reject);
|
||||
});
|
||||
this.connection = merge(error$, zip(pubConnect$, subClient$))
|
||||
.pipe(
|
||||
take(1),
|
||||
tap(() =>
|
||||
this.subClient.on(MESSAGE_EVENT, this.createResponseCallback()),
|
||||
),
|
||||
share(),
|
||||
)
|
||||
.toPromise();
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
public createClient(error$: Subject<Error>): RedisClient {
|
||||
@@ -108,16 +113,15 @@ export class ClientRedis extends ClientProxy {
|
||||
return this.getOptionsProp(this.options, 'retryDelay') || 0;
|
||||
}
|
||||
|
||||
public createResponseCallback(
|
||||
packet: ReadPacket & PacketId,
|
||||
callback: (packet: WritePacket) => any,
|
||||
): Function {
|
||||
public createResponseCallback(): Function {
|
||||
return (channel: string, buffer: string) => {
|
||||
const { err, response, isDisposed, id } = JSON.parse(
|
||||
buffer,
|
||||
) as WritePacket & PacketId;
|
||||
if (id !== packet.id) {
|
||||
return undefined;
|
||||
|
||||
const callback = this.routingMap.get(id);
|
||||
if (!callback) {
|
||||
return;
|
||||
}
|
||||
if (isDisposed || err) {
|
||||
return callback({
|
||||
@@ -141,26 +145,21 @@ export class ClientRedis extends ClientProxy {
|
||||
const packet = this.assignPacketId(partialPacket);
|
||||
const pattern = this.normalizePattern(partialPacket.pattern);
|
||||
const responseChannel = this.getResPatternName(pattern);
|
||||
const responseCallback = this.createResponseCallback(packet, callback);
|
||||
|
||||
this.subClient.on(MESSAGE_EVENT, responseCallback);
|
||||
this.subClient.subscribe(responseChannel);
|
||||
|
||||
const handler = channel => {
|
||||
if (channel && channel !== responseChannel) {
|
||||
return undefined;
|
||||
this.routingMap.set(packet.id, callback);
|
||||
this.subClient.subscribe(responseChannel, err => {
|
||||
if (err) {
|
||||
return;
|
||||
}
|
||||
this.subClient.removeListener(SUBSCRIBE, handler);
|
||||
};
|
||||
this.subClient.on(SUBSCRIBE, handler);
|
||||
|
||||
this.pubClient.publish(
|
||||
this.getAckPatternName(pattern),
|
||||
JSON.stringify(packet),
|
||||
);
|
||||
});
|
||||
|
||||
return () => {
|
||||
this.subClient.unsubscribe(responseChannel);
|
||||
this.subClient.removeListener(MESSAGE_EVENT, responseCallback);
|
||||
this.routingMap.delete(packet.id);
|
||||
};
|
||||
} catch (err) {
|
||||
callback({ err });
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
import * as JsonSocket from 'json-socket';
|
||||
import * as net from 'net';
|
||||
import { tap } from 'rxjs/operators';
|
||||
import { share, tap } from 'rxjs/operators';
|
||||
import {
|
||||
CLOSE_EVENT,
|
||||
ERROR_EVENT,
|
||||
@@ -18,6 +18,7 @@ import { ClientProxy } from './client-proxy';
|
||||
import { ECONNREFUSED } from './constants';
|
||||
|
||||
export class ClientTCP extends ClientProxy {
|
||||
protected connection: Promise<any>;
|
||||
private readonly logger = new Logger(ClientTCP.name);
|
||||
private readonly port: number;
|
||||
private readonly host: string;
|
||||
@@ -35,25 +36,33 @@ export class ClientTCP extends ClientProxy {
|
||||
}
|
||||
|
||||
public connect(): Promise<any> {
|
||||
if (this.isConnected) {
|
||||
return Promise.resolve();
|
||||
if (this.isConnected && this.connection) {
|
||||
return this.connection;
|
||||
}
|
||||
this.socket = this.createSocket();
|
||||
return new Promise((resolve, reject) => {
|
||||
this.bindEvents(this.socket);
|
||||
this.connect$(this.socket._socket)
|
||||
.pipe(tap(() => (this.isConnected = true)))
|
||||
.subscribe(resolve, reject);
|
||||
|
||||
const source$ = this.connect$(this.socket._socket).pipe(
|
||||
tap(() => {
|
||||
this.isConnected = true;
|
||||
this.socket.on(MESSAGE_EVENT, (buffer: WritePacket & PacketId) =>
|
||||
this.handleResponse(buffer),
|
||||
);
|
||||
}),
|
||||
share(),
|
||||
);
|
||||
|
||||
this.socket.connect(this.port, this.host);
|
||||
});
|
||||
this.connection = source$.toPromise();
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
public handleResponse(
|
||||
callback: (packet: WritePacket) => any,
|
||||
buffer: WritePacket,
|
||||
) {
|
||||
const { err, response, isDisposed } = buffer;
|
||||
public handleResponse(buffer: WritePacket & PacketId) {
|
||||
const { err, response, isDisposed, id } = buffer;
|
||||
const callback = this.routingMap.get(id);
|
||||
if (!callback) {
|
||||
return undefined;
|
||||
}
|
||||
if (isDisposed || err) {
|
||||
callback({
|
||||
err,
|
||||
@@ -99,16 +108,11 @@ export class ClientTCP extends ClientProxy {
|
||||
): Function {
|
||||
try {
|
||||
const packet = this.assignPacketId(partialPacket);
|
||||
const listener = (buffer: WritePacket & PacketId) => {
|
||||
if (buffer.id !== packet.id) {
|
||||
return undefined;
|
||||
}
|
||||
this.handleResponse(callback, buffer);
|
||||
};
|
||||
this.socket.on(MESSAGE_EVENT, listener);
|
||||
|
||||
this.routingMap.set(packet.id, callback);
|
||||
this.socket.sendMessage(packet);
|
||||
|
||||
return () => this.socket._socket.removeListener(MESSAGE_EVENT, listener);
|
||||
return () => this.routingMap.delete(packet.id);
|
||||
} catch (err) {
|
||||
callback({ err });
|
||||
}
|
||||
|
||||
@@ -30,10 +30,12 @@ describe('ClientMqtt', () => {
|
||||
removeListenerSpy: sinon.SinonSpy,
|
||||
unsubscribeSpy: sinon.SinonSpy,
|
||||
connectSpy: sinon.SinonStub,
|
||||
assignStub: sinon.SinonStub,
|
||||
mqttClient;
|
||||
|
||||
const id = '1';
|
||||
beforeEach(() => {
|
||||
subscribeSpy = sinon.spy();
|
||||
subscribeSpy = sinon.spy((name, fn) => fn());
|
||||
publishSpy = sinon.spy();
|
||||
onSpy = sinon.spy();
|
||||
removeListenerSpy = sinon.spy();
|
||||
@@ -49,9 +51,13 @@ describe('ClientMqtt', () => {
|
||||
};
|
||||
(client as any).mqttClient = mqttClient;
|
||||
connectSpy = sinon.stub(client, 'connect');
|
||||
assignStub = sinon
|
||||
.stub(client, 'assignPacketId')
|
||||
.callsFake(packet => Object.assign(packet, { id }));
|
||||
});
|
||||
afterEach(() => {
|
||||
connectSpy.restore();
|
||||
assignStub.restore();
|
||||
});
|
||||
it('should subscribe to response pattern name', async () => {
|
||||
await client['publish'](msg, () => {});
|
||||
@@ -62,22 +68,16 @@ describe('ClientMqtt', () => {
|
||||
expect(publishSpy.calledWith(`${pattern}_ack`, JSON.stringify(msg))).to.be
|
||||
.true;
|
||||
});
|
||||
it('should listen on messages', async () => {
|
||||
it('should add callback to routing map', async () => {
|
||||
await client['publish'](msg, () => {});
|
||||
expect(onSpy.called).to.be.true;
|
||||
expect(client['routingMap'].has(id)).to.be.true;
|
||||
});
|
||||
describe('on error', () => {
|
||||
let assignPacketIdStub: sinon.SinonStub;
|
||||
beforeEach(() => {
|
||||
assignPacketIdStub = sinon
|
||||
.stub(client, 'assignPacketId')
|
||||
.callsFake(() => {
|
||||
assignStub.callsFake(() => {
|
||||
throw new Error();
|
||||
});
|
||||
});
|
||||
afterEach(() => {
|
||||
assignPacketIdStub.restore();
|
||||
});
|
||||
|
||||
it('should call callback', () => {
|
||||
const callback = sinon.spy();
|
||||
@@ -88,17 +88,13 @@ describe('ClientMqtt', () => {
|
||||
});
|
||||
});
|
||||
describe('dispose callback', () => {
|
||||
let assignStub: sinon.SinonStub, getResPatternStub: sinon.SinonStub;
|
||||
let getResPatternStub: sinon.SinonStub;
|
||||
let callback: sinon.SinonSpy, subscription;
|
||||
|
||||
const channel = 'channel';
|
||||
const id = '1';
|
||||
|
||||
beforeEach(async () => {
|
||||
callback = sinon.spy();
|
||||
assignStub = sinon
|
||||
.stub(client, 'assignPacketId')
|
||||
.callsFake(packet => Object.assign(packet, { id }));
|
||||
|
||||
getResPatternStub = sinon
|
||||
.stub(client, 'getResPatternName')
|
||||
@@ -107,15 +103,14 @@ describe('ClientMqtt', () => {
|
||||
subscription(channel, JSON.stringify({ isDisposed: true, id }));
|
||||
});
|
||||
afterEach(() => {
|
||||
assignStub.restore();
|
||||
getResPatternStub.restore();
|
||||
});
|
||||
|
||||
it('should unsubscribe to response pattern name', () => {
|
||||
expect(unsubscribeSpy.calledWith(channel)).to.be.true;
|
||||
});
|
||||
it('should remove listener', () => {
|
||||
expect(removeListenerSpy.called).to.be.true;
|
||||
it('should remove callback from routin map', () => {
|
||||
expect(client['routingMap'].has(id)).to.be.false;
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -132,8 +127,9 @@ describe('ClientMqtt', () => {
|
||||
describe('not completed', () => {
|
||||
beforeEach(async () => {
|
||||
callback = sinon.spy();
|
||||
subscription = client.createResponseCallback();
|
||||
|
||||
subscription = client.createResponseCallback(msg, callback);
|
||||
client['routingMap'].set(responseMessage.id, callback);
|
||||
subscription('channel', new Buffer(JSON.stringify(responseMessage)));
|
||||
});
|
||||
it('should call callback with expected arguments', () => {
|
||||
@@ -148,7 +144,9 @@ describe('ClientMqtt', () => {
|
||||
describe('disposed and "id" is correct', () => {
|
||||
beforeEach(async () => {
|
||||
callback = sinon.spy();
|
||||
subscription = client.createResponseCallback(msg, callback);
|
||||
subscription = client.createResponseCallback();
|
||||
|
||||
client['routingMap'].set(responseMessage.id, callback);
|
||||
subscription(
|
||||
'channel',
|
||||
new Buffer(
|
||||
@@ -174,13 +172,9 @@ describe('ClientMqtt', () => {
|
||||
describe('disposed and "id" is incorrect', () => {
|
||||
beforeEach(async () => {
|
||||
callback = sinon.spy();
|
||||
subscription = client.createResponseCallback(
|
||||
{
|
||||
...msg,
|
||||
id: '2',
|
||||
},
|
||||
callback,
|
||||
);
|
||||
subscription = client.createResponseCallback();
|
||||
|
||||
client['routingMap'].set('3', callback);
|
||||
subscription('channel', new Buffer(JSON.stringify(responseMessage)));
|
||||
});
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ describe('ClientRedis', () => {
|
||||
pub;
|
||||
|
||||
beforeEach(() => {
|
||||
subscribeSpy = sinon.spy();
|
||||
subscribeSpy = sinon.spy((name, fn) => fn());
|
||||
publishSpy = sinon.spy();
|
||||
onSpy = sinon.spy();
|
||||
removeListenerSpy = sinon.spy();
|
||||
@@ -64,10 +64,6 @@ describe('ClientRedis', () => {
|
||||
expect(publishSpy.calledWith(`${pattern}_ack`, JSON.stringify(msg))).to.be
|
||||
.true;
|
||||
});
|
||||
it('should listen on messages', () => {
|
||||
client['publish'](msg, () => {});
|
||||
expect(onSpy.called).to.be.true;
|
||||
});
|
||||
describe('on error', () => {
|
||||
let assignPacketIdStub: sinon.SinonStub;
|
||||
beforeEach(() => {
|
||||
@@ -116,8 +112,8 @@ describe('ClientRedis', () => {
|
||||
it('should unsubscribe to response pattern name', () => {
|
||||
expect(unsubscribeSpy.calledWith(channel)).to.be.true;
|
||||
});
|
||||
it('should remove listener', () => {
|
||||
expect(removeListenerSpy.called).to.be.true;
|
||||
it('should clean routingMap', () => {
|
||||
expect(client['routingMap'].has(id)).to.be.false;
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -135,7 +131,8 @@ describe('ClientRedis', () => {
|
||||
beforeEach(async () => {
|
||||
callback = sinon.spy();
|
||||
|
||||
subscription = client.createResponseCallback(msg, callback);
|
||||
subscription = client.createResponseCallback();
|
||||
client['routingMap'].set(responseMessage.id, callback);
|
||||
subscription('channel', new Buffer(JSON.stringify(responseMessage)));
|
||||
});
|
||||
it('should call callback with expected arguments', () => {
|
||||
@@ -150,7 +147,8 @@ describe('ClientRedis', () => {
|
||||
describe('disposed and "id" is correct', () => {
|
||||
beforeEach(async () => {
|
||||
callback = sinon.spy();
|
||||
subscription = client.createResponseCallback(msg, callback);
|
||||
subscription = client.createResponseCallback();
|
||||
client['routingMap'].set(responseMessage.id, callback);
|
||||
subscription(
|
||||
'channel',
|
||||
new Buffer(
|
||||
@@ -176,13 +174,7 @@ describe('ClientRedis', () => {
|
||||
describe('disposed and "id" is incorrect', () => {
|
||||
beforeEach(async () => {
|
||||
callback = sinon.spy();
|
||||
subscription = client.createResponseCallback(
|
||||
{
|
||||
...msg,
|
||||
id: '2',
|
||||
},
|
||||
callback,
|
||||
);
|
||||
subscription = client.createResponseCallback();
|
||||
subscription('channel', new Buffer(JSON.stringify(responseMessage)));
|
||||
});
|
||||
|
||||
@@ -232,6 +224,7 @@ describe('ClientRedis', () => {
|
||||
removeListener: () => null,
|
||||
}));
|
||||
handleErrorsSpy = sinon.spy(client, 'handleError');
|
||||
|
||||
client.connect();
|
||||
client['pubClient'] = null;
|
||||
});
|
||||
|
||||
@@ -10,7 +10,7 @@ describe('ClientTCP', () => {
|
||||
connect: sinon.SinonStub;
|
||||
publish: sinon.SinonSpy;
|
||||
_socket: {
|
||||
addListener: sinon.SinonStub,
|
||||
addListener: sinon.SinonStub;
|
||||
removeListener: sinon.SinonSpy;
|
||||
once: sinon.SinonStub;
|
||||
};
|
||||
@@ -54,14 +54,11 @@ describe('ClientTCP', () => {
|
||||
it('should send message', () => {
|
||||
client['publish'](msg, () => ({}));
|
||||
});
|
||||
it('should listen on messages', () => {
|
||||
client['publish'](msg, () => ({}));
|
||||
expect(socket.on.called).to.be.true;
|
||||
});
|
||||
describe('on dispose', () => {
|
||||
it('should remove listener', () => {
|
||||
it('should remove listener from routing map', () => {
|
||||
client['publish'](msg, () => ({}))();
|
||||
expect(socket._socket.removeListener.called).to.be.true;
|
||||
|
||||
expect(client['routingMap'].size).to.be.eq(0);
|
||||
});
|
||||
});
|
||||
describe('on error', () => {
|
||||
@@ -78,10 +75,13 @@ describe('ClientTCP', () => {
|
||||
});
|
||||
describe('handleResponse', () => {
|
||||
let callback;
|
||||
const id = '1';
|
||||
|
||||
describe('when disposed', () => {
|
||||
beforeEach(() => {
|
||||
callback = sinon.spy();
|
||||
client.handleResponse(callback, { isDisposed: true });
|
||||
client['routingMap'].set(id, callback);
|
||||
client.handleResponse({ id, isDisposed: true });
|
||||
});
|
||||
it('should emit disposed callback', () => {
|
||||
expect(callback.called).to.be.true;
|
||||
@@ -97,9 +97,10 @@ describe('ClientTCP', () => {
|
||||
describe('when not disposed', () => {
|
||||
let buffer;
|
||||
beforeEach(() => {
|
||||
buffer = { err: null, response: 'res' };
|
||||
buffer = { id, err: null, response: 'res' };
|
||||
callback = sinon.spy();
|
||||
client.handleResponse(callback, buffer);
|
||||
client['routingMap'].set(id, callback);
|
||||
client.handleResponse(buffer);
|
||||
});
|
||||
it('should not end server', () => {
|
||||
expect(socket.end.called).to.be.false;
|
||||
@@ -148,6 +149,9 @@ describe('ClientTCP', () => {
|
||||
it('should call "connect$" once', async () => {
|
||||
expect(connect$Stub.called).to.be.true;
|
||||
});
|
||||
it('should listen on messages', () => {
|
||||
expect(socket.on.called).to.be.true;
|
||||
});
|
||||
});
|
||||
describe('when is connected', () => {
|
||||
beforeEach(() => {
|
||||
|
||||
Reference in New Issue
Block a user