mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
refactor(microservices) partially fixed rabbitmq transport (+tests)
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
import { Transport } from '../../enums/transport.enum';
|
||||
import { MqttClientOptions } from '../external/mqtt-options.interface';
|
||||
import { CustomTransportStrategy } from './custom-transport-strategy.interface';
|
||||
import { Options } from 'amqplib';
|
||||
|
||||
export type MicroserviceOptions =
|
||||
| GrpcOptions
|
||||
@@ -38,7 +37,7 @@ export interface GrpcOptions {
|
||||
oneofs?: boolean;
|
||||
json?: boolean;
|
||||
includeDirs?: string[];
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
@@ -88,6 +87,6 @@ export interface RmqOptions {
|
||||
queue?: string;
|
||||
prefetchCount?: number;
|
||||
isGlobalPrefetchCount?: boolean;
|
||||
queueOptions?: Options.AssertQueue;
|
||||
queueOptions?: any;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,133 +1,174 @@
|
||||
import { Connection, Options } from 'amqplib';
|
||||
import { ERROR_EVENT, RQM_DEFAULT_URL, RQM_DEFAULT_QUEUE, RQM_DEFAULT_PREFETCH_COUNT, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, RQM_DEFAULT_QUEUE_OPTIONS, CONNECT_EVENT, DISCONNECT_EVENT } from './../constants';
|
||||
import { Logger } from '@nestjs/common/services/logger.service';
|
||||
import { loadPackage } from '@nestjs/common/utils/load-package.util';
|
||||
import { ClientProxy } from './client-proxy';
|
||||
import { ClientOptions, RmqOptions } from '../interfaces';
|
||||
import { WritePacket } from './../interfaces';
|
||||
import { EventEmitter } from 'events';
|
||||
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
|
||||
import * as amqp from 'amqp-connection-manager';
|
||||
import { EventEmitter } from 'events';
|
||||
import { fromEvent, merge, Observable } from 'rxjs';
|
||||
import { first, map, share, switchMap } from 'rxjs/operators';
|
||||
import { ClientOptions, RmqOptions } from '../interfaces';
|
||||
import {
|
||||
DISCONNECT_EVENT,
|
||||
ERROR_EVENT,
|
||||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
|
||||
RQM_DEFAULT_PREFETCH_COUNT,
|
||||
RQM_DEFAULT_QUEUE,
|
||||
RQM_DEFAULT_QUEUE_OPTIONS,
|
||||
RQM_DEFAULT_URL,
|
||||
} from './../constants';
|
||||
import { WritePacket } from './../interfaces';
|
||||
import { ClientProxy } from './client-proxy';
|
||||
|
||||
let rqmPackage: any = {};
|
||||
|
||||
export class ClientRMQ extends ClientProxy {
|
||||
private readonly logger = new Logger(ClientProxy.name);
|
||||
private client: any = null;
|
||||
private channel: any = null;
|
||||
private urls: string[];
|
||||
private queue: string;
|
||||
private prefetchCount: number;
|
||||
private isGlobalPrefetchCount: boolean;
|
||||
private queueOptions: Options.AssertQueue;
|
||||
private replyQueue: string;
|
||||
private responseEmitter: EventEmitter;
|
||||
protected readonly logger = new Logger(ClientProxy.name);
|
||||
protected connection: Promise<any>;
|
||||
protected client: any = null;
|
||||
protected channel: any = null;
|
||||
protected urls: string[];
|
||||
protected queue: string;
|
||||
protected prefetchCount: number;
|
||||
protected isGlobalPrefetchCount: boolean;
|
||||
protected queueOptions: any;
|
||||
protected replyQueue: string;
|
||||
protected responseEmitter: EventEmitter;
|
||||
|
||||
constructor(
|
||||
private readonly options: ClientOptions['options']) {
|
||||
super();
|
||||
this.urls =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'urls') || [RQM_DEFAULT_URL];
|
||||
this.queue =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
|
||||
this.prefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'prefetchCount') || RQM_DEFAULT_PREFETCH_COUNT;
|
||||
this.isGlobalPrefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
|
||||
this.queueOptions =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS;
|
||||
rqmPackage = loadPackage('amqplib', ClientRMQ.name);
|
||||
this.connect();
|
||||
constructor(protected readonly options: ClientOptions['options']) {
|
||||
super();
|
||||
this.urls = this.getOptionsProp<RmqOptions>(this.options, 'urls') || [
|
||||
RQM_DEFAULT_URL,
|
||||
];
|
||||
this.queue =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queue') ||
|
||||
RQM_DEFAULT_QUEUE;
|
||||
this.prefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'prefetchCount') ||
|
||||
RQM_DEFAULT_PREFETCH_COUNT;
|
||||
this.isGlobalPrefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'isGlobalPrefetchCount') ||
|
||||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
|
||||
this.queueOptions =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queueOptions') ||
|
||||
RQM_DEFAULT_QUEUE_OPTIONS;
|
||||
|
||||
rqmPackage = loadPackage('amqplib', ClientRMQ.name);
|
||||
}
|
||||
|
||||
public close(): void {
|
||||
this.channel && this.channel.close();
|
||||
this.client && this.client.close();
|
||||
}
|
||||
|
||||
public listen() {
|
||||
this.channel.addSetup(channel =>
|
||||
channel.consume(
|
||||
this.replyQueue,
|
||||
msg => {
|
||||
this.responseEmitter.emit(msg.properties.correlationId, msg);
|
||||
},
|
||||
{ noAck: true },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
public connect(): Promise<any> {
|
||||
if (this.client && this.channel) {
|
||||
return this.connection;
|
||||
}
|
||||
this.client = this.createClient();
|
||||
this.handleError(this.client);
|
||||
|
||||
public close(): void {
|
||||
this.channel && this.channel.close();
|
||||
this.client && this.client.close();
|
||||
const connect$ = this.connect$(this.client);
|
||||
this.connection = this.mergeDisconnectEvent(this.client, connect$)
|
||||
.pipe(
|
||||
switchMap(() => {
|
||||
return new Promise(resolve =>
|
||||
this.client.createChannel({
|
||||
json: false,
|
||||
setup: async channel => this.setupChannel(channel, resolve),
|
||||
}),
|
||||
);
|
||||
}),
|
||||
share(),
|
||||
)
|
||||
.toPromise();
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
public createClient<T = any>(): T {
|
||||
return amqp.connect(this.urls) as T;
|
||||
}
|
||||
|
||||
public mergeDisconnectEvent<T = any>(
|
||||
instance: any,
|
||||
source$: Observable<T>,
|
||||
): Observable<T> {
|
||||
const close$ = fromEvent(instance, DISCONNECT_EVENT).pipe(
|
||||
map(err => {
|
||||
throw err;
|
||||
}),
|
||||
);
|
||||
return merge(source$, close$).pipe(first());
|
||||
}
|
||||
|
||||
public async setupChannel(channel: any, resolve: Function) {
|
||||
await channel.assertQueue(this.queue, this.queueOptions);
|
||||
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
|
||||
this.replyQueue = (await channel.assertQueue('', {
|
||||
exclusive: true,
|
||||
})).queue;
|
||||
|
||||
this.responseEmitter = new EventEmitter();
|
||||
this.responseEmitter.setMaxListeners(0);
|
||||
this.listen();
|
||||
|
||||
resolve();
|
||||
}
|
||||
|
||||
protected publish(
|
||||
message: any,
|
||||
callback: (packet: WritePacket) => any,
|
||||
): Function {
|
||||
try {
|
||||
const correlationId = randomStringGenerator();
|
||||
const listener = msg => {
|
||||
const { content } = msg;
|
||||
this.handleMessage(content, callback);
|
||||
};
|
||||
this.responseEmitter.on(correlationId, listener);
|
||||
this.channel.sendToQueue(
|
||||
this.queue,
|
||||
Buffer.from(JSON.stringify(message)),
|
||||
{
|
||||
replyTo: this.replyQueue,
|
||||
correlationId,
|
||||
},
|
||||
);
|
||||
return () => this.responseEmitter.removeListener(correlationId, listener);
|
||||
} catch (err) {
|
||||
callback({ err });
|
||||
}
|
||||
}
|
||||
|
||||
public listen() {
|
||||
this.channel.addSetup((channel) => {
|
||||
return Promise.all([
|
||||
channel.consume(this.replyQueue, (msg) => {
|
||||
this.responseEmitter.emit(msg.properties.correlationId, msg);
|
||||
}, { noAck: true }),
|
||||
]);
|
||||
});
|
||||
public handleMessage(
|
||||
packet: WritePacket,
|
||||
callback: (packet: WritePacket) => any,
|
||||
) {
|
||||
const { err, response, isDisposed } = packet;
|
||||
if (isDisposed || err) {
|
||||
callback({
|
||||
err,
|
||||
response: null,
|
||||
isDisposed: true,
|
||||
});
|
||||
}
|
||||
callback({
|
||||
err,
|
||||
response,
|
||||
});
|
||||
}
|
||||
|
||||
public connect(): Promise<any> {
|
||||
if (this.client && this.channel) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
return new Promise(async (resolve, reject) => {
|
||||
this.client = amqp.connect(this.urls);
|
||||
this.client.on(CONNECT_EVENT, x => {
|
||||
this.channel = this.client.createChannel({
|
||||
json: false,
|
||||
setup: async (channel) => {
|
||||
await channel.assertQueue(this.queue, this.queueOptions);
|
||||
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
|
||||
this.replyQueue = (await channel.assertQueue('', { exclusive: true })).queue;
|
||||
this.responseEmitter = new EventEmitter();
|
||||
this.responseEmitter.setMaxListeners(0);
|
||||
this.listen();
|
||||
resolve();
|
||||
},
|
||||
});
|
||||
});
|
||||
this.client.on(DISCONNECT_EVENT, err => {
|
||||
reject(err);
|
||||
this.client.close();
|
||||
this.client = null;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
protected publish(messageObj, callback: (packet: WritePacket) => any) {
|
||||
if (!this.client) {
|
||||
this.connect().then(x => {
|
||||
this.sendMessage(messageObj, callback);
|
||||
});
|
||||
} else {
|
||||
this.sendMessage(messageObj, callback);
|
||||
}
|
||||
}
|
||||
|
||||
private sendMessage(messageObj, callback: (packet: WritePacket) => any) {
|
||||
try {
|
||||
const correlationId = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
|
||||
this.responseEmitter.on(correlationId, msg => {
|
||||
const { content } = msg;
|
||||
this.handleMessage(content, callback);
|
||||
});
|
||||
this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(messageObj)), {
|
||||
replyTo: this.replyQueue,
|
||||
correlationId,
|
||||
});
|
||||
} catch (err) {
|
||||
this.logger.error(err);
|
||||
callback({ err });
|
||||
}
|
||||
}
|
||||
|
||||
public handleMessage(
|
||||
msg: WritePacket,
|
||||
callback: (packet: WritePacket) => any,
|
||||
) {
|
||||
const { err, response, isDisposed } = JSON.parse(msg.toString());
|
||||
if (isDisposed || err) {
|
||||
callback({
|
||||
err,
|
||||
response: null,
|
||||
isDisposed: true,
|
||||
});
|
||||
}
|
||||
callback({
|
||||
err,
|
||||
response,
|
||||
});
|
||||
}
|
||||
|
||||
public handleError(client: Connection): void {
|
||||
client.addListener(ERROR_EVENT, err => this.logger.error(err));
|
||||
}
|
||||
}
|
||||
public handleError(client: any): void {
|
||||
client.addListener(ERROR_EVENT, err => this.logger.error(err));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
export const ECONNREFUSED = 'ECONNREFUSED';
|
||||
export const CONN_ERR = 'CONN_ERR';
|
||||
export const GRPC_CANCELLED = 'Cancelled';
|
||||
export const GRPC_CANCELLED = 'Cancelled';
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { MqttClientOptions } from '@nestjs/common/interfaces/external/mqtt-options.interface';
|
||||
import { Transport } from '../enums/transport.enum';
|
||||
import { Server } from './../server/server';
|
||||
import { Options } from 'amqplib';
|
||||
import { CustomTransportStrategy } from './custom-transport-strategy.interface';
|
||||
|
||||
export type MicroserviceOptions =
|
||||
@@ -39,7 +38,7 @@ export interface GrpcOptions {
|
||||
oneofs?: boolean;
|
||||
json?: boolean;
|
||||
includeDirs?: string[];
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
@@ -89,6 +88,6 @@ export interface RmqOptions {
|
||||
queue?: string;
|
||||
prefetchCount?: number;
|
||||
isGlobalPrefetchCount?: boolean;
|
||||
queueOptions?: Options.AssertQueue;
|
||||
queueOptions?: any;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,81 +1,115 @@
|
||||
import { Server } from './server';
|
||||
import { Options } from 'amqplib';
|
||||
import { DISCONNECTED_RMQ_MESSAGE, DISCONNECT_EVENT, RQM_DEFAULT_URL, RQM_DEFAULT_QUEUE, RQM_DEFAULT_PREFETCH_COUNT, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, RQM_DEFAULT_QUEUE_OPTIONS, CONNECT_EVENT } from './../constants';
|
||||
import { CustomTransportStrategy, RmqOptions } from './../interfaces';
|
||||
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
|
||||
import { loadPackage } from '@nestjs/common/utils/load-package.util';
|
||||
import { Observable } from 'rxjs';
|
||||
import * as amqp from 'amqp-connection-manager';
|
||||
import { Observable } from 'rxjs';
|
||||
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
|
||||
import {
|
||||
CONNECT_EVENT,
|
||||
DISCONNECTED_RMQ_MESSAGE,
|
||||
DISCONNECT_EVENT,
|
||||
NO_PATTERN_MESSAGE,
|
||||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
|
||||
RQM_DEFAULT_PREFETCH_COUNT,
|
||||
RQM_DEFAULT_QUEUE,
|
||||
RQM_DEFAULT_QUEUE_OPTIONS,
|
||||
RQM_DEFAULT_URL,
|
||||
} from './../constants';
|
||||
import { CustomTransportStrategy, RmqOptions } from './../interfaces';
|
||||
import { Server } from './server';
|
||||
|
||||
let rqmPackage: any = {};
|
||||
|
||||
export class ServerRMQ extends Server implements CustomTransportStrategy {
|
||||
private server: any = null;
|
||||
private channel: any = null;
|
||||
private urls: string[];
|
||||
private queue: string;
|
||||
private prefetchCount: number;
|
||||
private queueOptions: Options.AssertQueue;
|
||||
private isGlobalPrefetchCount: boolean;
|
||||
private server: any = null;
|
||||
private channel: any = null;
|
||||
private urls: string[];
|
||||
private queue: string;
|
||||
private prefetchCount: number;
|
||||
private queueOptions: any;
|
||||
private isGlobalPrefetchCount: boolean;
|
||||
|
||||
constructor(private readonly options: MicroserviceOptions) {
|
||||
super();
|
||||
this.urls =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'urls') || [RQM_DEFAULT_URL];
|
||||
this.queue =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
|
||||
this.prefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'prefetchCount') || RQM_DEFAULT_PREFETCH_COUNT;
|
||||
this.isGlobalPrefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
|
||||
this.queueOptions =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS;
|
||||
rqmPackage = loadPackage('amqplib', ServerRMQ.name);
|
||||
constructor(private readonly options: MicroserviceOptions) {
|
||||
super();
|
||||
this.urls = this.getOptionsProp<RmqOptions>(this.options, 'urls') || [
|
||||
RQM_DEFAULT_URL,
|
||||
];
|
||||
this.queue =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queue') ||
|
||||
RQM_DEFAULT_QUEUE;
|
||||
this.prefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'prefetchCount') ||
|
||||
RQM_DEFAULT_PREFETCH_COUNT;
|
||||
this.isGlobalPrefetchCount =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'isGlobalPrefetchCount') ||
|
||||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
|
||||
this.queueOptions =
|
||||
this.getOptionsProp<RmqOptions>(this.options, 'queueOptions') ||
|
||||
RQM_DEFAULT_QUEUE_OPTIONS;
|
||||
|
||||
rqmPackage = loadPackage('amqplib', ServerRMQ.name);
|
||||
}
|
||||
|
||||
public async listen(callback: () => void): Promise<void> {
|
||||
await this.start(callback);
|
||||
}
|
||||
|
||||
public close(): void {
|
||||
this.channel && this.channel.close();
|
||||
this.server && this.server.close();
|
||||
}
|
||||
|
||||
private async start(callback?: () => void) {
|
||||
this.server = amqp.connect(this.urls);
|
||||
this.server.on(CONNECT_EVENT, _ => {
|
||||
this.channel = this.server.createChannel({
|
||||
json: false,
|
||||
setup: async channel => {
|
||||
await channel.assertQueue(this.queue, this.queueOptions);
|
||||
await channel.prefetch(
|
||||
this.prefetchCount,
|
||||
this.isGlobalPrefetchCount,
|
||||
);
|
||||
channel.consume(this.queue, msg => this.handleMessage(msg), {
|
||||
noAck: true,
|
||||
});
|
||||
callback();
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
this.server.on(DISCONNECT_EVENT, err => {
|
||||
this.logger.error(DISCONNECTED_RMQ_MESSAGE);
|
||||
});
|
||||
}
|
||||
|
||||
private async handleMessage(message: any): Promise<void> {
|
||||
const { content, properties } = message;
|
||||
const packet = JSON.parse(content.toString());
|
||||
const pattern = JSON.stringify(packet.pattern);
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return this.sendMessage(
|
||||
{ status, err: NO_PATTERN_MESSAGE },
|
||||
properties.replyTo,
|
||||
properties.correlationId,
|
||||
);
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
response$ &&
|
||||
this.send(response$, data =>
|
||||
this.sendMessage(data, properties.replyTo, properties.correlationId),
|
||||
);
|
||||
}
|
||||
|
||||
public async listen(callback: () => void): Promise<void> {
|
||||
await this.start(callback);
|
||||
}
|
||||
|
||||
public close(): void {
|
||||
this.channel && this.channel.close();
|
||||
this.server && this.server.close();
|
||||
}
|
||||
|
||||
private async start(callback?: () => void) {
|
||||
this.server = amqp.connect(this.urls);
|
||||
this.server.on(CONNECT_EVENT, x => {
|
||||
this.channel = this.server.createChannel({
|
||||
json: false,
|
||||
setup: async (channel) => {
|
||||
await channel.assertQueue(this.queue, this.queueOptions);
|
||||
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
|
||||
channel.consume(this.queue, (msg) => this.handleMessage(msg), { noAck: true });
|
||||
callback();
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
this.server.on(DISCONNECT_EVENT, err => {
|
||||
this.logger.error(DISCONNECTED_RMQ_MESSAGE);
|
||||
});
|
||||
}
|
||||
|
||||
private async handleMessage(message): Promise<void> {
|
||||
const { content, properties } = message;
|
||||
const messageObj = JSON.parse(content.toString());
|
||||
const handlers = this.getHandlers();
|
||||
const pattern = JSON.stringify(messageObj.pattern);
|
||||
if (!this.messageHandlers[pattern]) {
|
||||
return;
|
||||
}
|
||||
const handler = this.messageHandlers[pattern];
|
||||
const response$ = this.transformToObservable(await handler(messageObj.data)) as Observable<any>;
|
||||
response$ && this.send(response$, (data) => this.sendMessage(data, properties.replyTo, properties.correlationId));
|
||||
}
|
||||
|
||||
private sendMessage(message, replyTo, correlationId): void {
|
||||
const buffer = Buffer.from(JSON.stringify(message));
|
||||
this.channel.sendToQueue(replyTo, buffer, { correlationId });
|
||||
}
|
||||
private sendMessage<T = any>(
|
||||
message: T,
|
||||
replyTo: any,
|
||||
correlationId: string,
|
||||
): void {
|
||||
const buffer = Buffer.from(JSON.stringify(message));
|
||||
this.channel.sendToQueue(replyTo, buffer, { correlationId });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,145 +1,181 @@
|
||||
import * as sinon from 'sinon';
|
||||
import { expect } from 'chai';
|
||||
import { ClientRMQ } from '../../client/client-rmq';
|
||||
import { EventEmitter } from 'events';
|
||||
import { empty } from 'rxjs';
|
||||
import * as sinon from 'sinon';
|
||||
import { ClientRMQ } from '../../client/client-rmq';
|
||||
// tslint:disable:no-string-literal
|
||||
|
||||
describe('ClientRQM', () => {
|
||||
const test = 'test';
|
||||
const client = new ClientRMQ({});
|
||||
const client = new ClientRMQ({});
|
||||
|
||||
describe('connect', () => {
|
||||
let createChannelSpy: sinon.SinonSpy,
|
||||
assertQueueSpy: sinon.SinonSpy,
|
||||
listenSpy: sinon.SinonSpy;
|
||||
describe('connect', () => {
|
||||
let createClientStub: sinon.SinonStub;
|
||||
let handleErrorsSpy: sinon.SinonSpy;
|
||||
let connect$Stub: sinon.SinonStub;
|
||||
let mergeDisconnectEvent: sinon.SinonStub;
|
||||
|
||||
beforeEach( async () => {
|
||||
(client as any).client = {
|
||||
createChannel: createChannelSpy,
|
||||
};
|
||||
(client as any).channel = {
|
||||
assertQueue: assertQueueSpy
|
||||
};
|
||||
(client as any).listen = listenSpy;
|
||||
});
|
||||
beforeEach(async () => {
|
||||
createClientStub = sinon.stub(client, 'createClient').callsFake(() => ({
|
||||
addListener: () => ({}),
|
||||
removeListener: () => ({}),
|
||||
}));
|
||||
handleErrorsSpy = sinon.spy(client, 'handleError');
|
||||
connect$Stub = sinon.stub(client, 'connect$').callsFake(() => ({
|
||||
subscribe: resolve => resolve(),
|
||||
toPromise() {
|
||||
return this;
|
||||
},
|
||||
pipe() {
|
||||
return this;
|
||||
},
|
||||
}));
|
||||
mergeDisconnectEvent = sinon
|
||||
.stub(client, 'mergeDisconnectEvent')
|
||||
.callsFake((_, source) => source);
|
||||
});
|
||||
afterEach(() => {
|
||||
createClientStub.restore();
|
||||
handleErrorsSpy.restore();
|
||||
connect$Stub.restore();
|
||||
mergeDisconnectEvent.restore();
|
||||
});
|
||||
describe('when is not connected', () => {
|
||||
beforeEach(async () => {
|
||||
client['mqttClient'] = null;
|
||||
await client.connect();
|
||||
});
|
||||
it('should call "handleError" once', async () => {
|
||||
expect(handleErrorsSpy.called).to.be.true;
|
||||
});
|
||||
it('should call "createClient" once', async () => {
|
||||
expect(createClientStub.called).to.be.true;
|
||||
});
|
||||
it('should call "connect$" once', async () => {
|
||||
expect(connect$Stub.called).to.be.true;
|
||||
});
|
||||
});
|
||||
describe('when is connected', () => {
|
||||
beforeEach(() => {
|
||||
client['client'] = { test: true } as any;
|
||||
});
|
||||
it('should not call "createClient"', () => {
|
||||
expect(createClientStub.called).to.be.false;
|
||||
});
|
||||
it('should not call "handleError"', () => {
|
||||
expect(handleErrorsSpy.called).to.be.false;
|
||||
});
|
||||
it('should not call "connect$"', () => {
|
||||
expect(connect$Stub.called).to.be.false;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should create channel on connect()', () => {
|
||||
client['connect']().then(() => {
|
||||
expect(createChannelSpy.called).to.be.true;
|
||||
});
|
||||
});
|
||||
describe('mergeDisconnectEvent', () => {
|
||||
it('should merge disconnect event', () => {
|
||||
const error = new Error();
|
||||
const instance: any = {
|
||||
on: (ev, callback) => callback(error),
|
||||
off: () => ({}),
|
||||
};
|
||||
client
|
||||
.mergeDisconnectEvent(instance as any, empty())
|
||||
.subscribe(null, err => expect(err).to.be.eql(error));
|
||||
});
|
||||
});
|
||||
|
||||
it('should create assert queues on connect()', () => {
|
||||
client['connect']().then(() => {
|
||||
expect(assertQueueSpy.called).to.be.true;
|
||||
});
|
||||
});
|
||||
describe('publish', () => {
|
||||
const pattern = 'test';
|
||||
const msg = { pattern, data: 'data' };
|
||||
let connectSpy: sinon.SinonSpy,
|
||||
sendToQueueSpy: sinon.SinonSpy,
|
||||
eventSpy: sinon.SinonSpy;
|
||||
|
||||
it('should call listen() on connect()', () => {
|
||||
client['connect']().then(() => {
|
||||
expect(listenSpy.called).to.be.true;
|
||||
});
|
||||
});
|
||||
beforeEach(() => {
|
||||
connectSpy = sinon.spy(client, 'connect');
|
||||
eventSpy = sinon.spy();
|
||||
sendToQueueSpy = sinon.spy();
|
||||
|
||||
(client as any).client = {};
|
||||
(client as any).channel = {
|
||||
sendToQueue: sendToQueueSpy,
|
||||
};
|
||||
(client as any).responseEmitter = new EventEmitter();
|
||||
(client as any).responseEmitter.on('test', eventSpy);
|
||||
});
|
||||
|
||||
describe('publish', () => {
|
||||
const pattern = 'test';
|
||||
const msg = { pattern, data: 'data' };
|
||||
let connectSpy: sinon.SinonSpy,
|
||||
sendToQueueSpy: sinon.SinonSpy,
|
||||
eventSpy: sinon.SinonSpy;
|
||||
|
||||
beforeEach(() => {
|
||||
connectSpy = sinon.spy(client, 'connect');
|
||||
eventSpy = sinon.spy();
|
||||
|
||||
(client as any).client = {};
|
||||
(client as any).channel = {
|
||||
sendToQueue: sendToQueueSpy
|
||||
};
|
||||
(client as any).responseEmitter = new EventEmitter();
|
||||
(client as any).responseEmitter.on('test', eventSpy)
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
connectSpy.restore();
|
||||
});
|
||||
|
||||
it('should not call "connect()" when client not null', () => {
|
||||
client['publish'](msg, () => {});
|
||||
expect(connectSpy.called).to.be.false;
|
||||
});
|
||||
|
||||
it('should call "connect()" when client is null', () => {
|
||||
(client as any).client = null;
|
||||
client['publish'](msg, () => {});
|
||||
expect(connectSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should invoke callback on event', () => {
|
||||
(client as any).client = null;
|
||||
client['publish'](msg, () => {
|
||||
(client as any).responseEmitter.emit('test');
|
||||
});
|
||||
expect(eventSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should send message', () => {
|
||||
client['publish'](msg, () => {
|
||||
expect(sendToQueueSpy.called).to.be.true;
|
||||
});
|
||||
});
|
||||
afterEach(() => {
|
||||
connectSpy.restore();
|
||||
});
|
||||
|
||||
describe('handleMessage', () => {
|
||||
const msg = {};
|
||||
let callbackSpy: sinon.SinonSpy;
|
||||
let deleteQueueSpy: sinon.SinonSpy;
|
||||
let callback = (data) => {};
|
||||
it('should send message', () => {
|
||||
client['publish'](msg, () => {
|
||||
expect(sendToQueueSpy.called).to.be.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
callbackSpy = sinon.spy();
|
||||
deleteQueueSpy = sinon.spy();
|
||||
(client as any).channel = { deleteQueue: deleteQueueSpy };
|
||||
callback = callbackSpy;
|
||||
});
|
||||
describe('handleMessage', () => {
|
||||
const msg: any = {};
|
||||
let callbackSpy: sinon.SinonSpy;
|
||||
let deleteQueueSpy: sinon.SinonSpy;
|
||||
let callback = data => {};
|
||||
|
||||
it('should callback if no error or isDisposed', () => {
|
||||
msg.content = JSON.stringify({ err: null, response: 'test', isDisposed: false });
|
||||
client['handleMessage'](msg, callback);
|
||||
expect(callbackSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should callback if error', () => {
|
||||
msg.content = JSON.stringify({ err: true, response: 'test', isDisposed: false });
|
||||
client['handleMessage'](msg, callback);
|
||||
expect(callbackSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should callback if isDisposed', () => {
|
||||
msg.content = JSON.stringify({ err: null, response: 'test', isDisposed: true });
|
||||
client['handleMessage'](msg, callback);
|
||||
expect(callbackSpy.called).to.be.true;
|
||||
});
|
||||
beforeEach(() => {
|
||||
callbackSpy = sinon.spy();
|
||||
deleteQueueSpy = sinon.spy();
|
||||
(client as any).channel = { deleteQueue: deleteQueueSpy };
|
||||
callback = callbackSpy;
|
||||
});
|
||||
|
||||
describe('close', () => {
|
||||
let channelCloseSpy: sinon.SinonSpy;
|
||||
let clientCloseSpy: sinon.SinonSpy;
|
||||
beforeEach(() => {
|
||||
channelCloseSpy = sinon.spy();
|
||||
clientCloseSpy = sinon.spy();
|
||||
(client as any).channel = { close: channelCloseSpy };
|
||||
(client as any).client = { close: clientCloseSpy };
|
||||
});
|
||||
|
||||
it('should close channel when it is not null', () => {
|
||||
client.close();
|
||||
expect(channelCloseSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should close client when it is not null', () => {
|
||||
client.close();
|
||||
expect(clientCloseSpy.called).to.be.true;
|
||||
});
|
||||
it('should callback if no error or isDisposed', () => {
|
||||
msg.content = JSON.stringify({
|
||||
err: null,
|
||||
response: 'test',
|
||||
isDisposed: false,
|
||||
});
|
||||
client.handleMessage(msg, callback);
|
||||
expect(callbackSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should callback if error', () => {
|
||||
msg.content = JSON.stringify({
|
||||
err: true,
|
||||
response: 'test',
|
||||
isDisposed: false,
|
||||
});
|
||||
client.handleMessage(msg, callback);
|
||||
expect(callbackSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should callback if isDisposed', () => {
|
||||
msg.content = JSON.stringify({
|
||||
err: null,
|
||||
response: 'test',
|
||||
isDisposed: true,
|
||||
});
|
||||
client.handleMessage(msg, callback);
|
||||
expect(callbackSpy.called).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe('close', () => {
|
||||
let channelCloseSpy: sinon.SinonSpy;
|
||||
let clientCloseSpy: sinon.SinonSpy;
|
||||
beforeEach(() => {
|
||||
channelCloseSpy = sinon.spy();
|
||||
clientCloseSpy = sinon.spy();
|
||||
(client as any).channel = { close: channelCloseSpy };
|
||||
(client as any).client = { close: clientCloseSpy };
|
||||
});
|
||||
|
||||
it('should close channel when it is not null', () => {
|
||||
client.close();
|
||||
expect(channelCloseSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should close client when it is not null', () => {
|
||||
client.close();
|
||||
expect(clientCloseSpy.called).to.be.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user