From 91be484b83a7c792100035e9e1dafc713252c681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Wed, 3 Oct 2018 23:26:52 +0200 Subject: [PATCH] refactor(microservices) partially fixed rabbitmq transport (+tests) --- .../microservice-configuration.interface.ts | 7 +- packages/microservices/client/client-rmq.ts | 279 +++++++++-------- packages/microservices/client/constants.ts | 2 +- .../microservice-configuration.interface.ts | 7 +- packages/microservices/server/server-rqm.ts | 176 ++++++----- .../test/client/client-rmq.spec.ts | 288 ++++++++++-------- 6 files changed, 434 insertions(+), 325 deletions(-) diff --git a/packages/common/interfaces/microservices/microservice-configuration.interface.ts b/packages/common/interfaces/microservices/microservice-configuration.interface.ts index 6544f3449..d0bc70b42 100644 --- a/packages/common/interfaces/microservices/microservice-configuration.interface.ts +++ b/packages/common/interfaces/microservices/microservice-configuration.interface.ts @@ -1,7 +1,6 @@ import { Transport } from '../../enums/transport.enum'; import { MqttClientOptions } from '../external/mqtt-options.interface'; import { CustomTransportStrategy } from './custom-transport-strategy.interface'; -import { Options } from 'amqplib'; export type MicroserviceOptions = | GrpcOptions @@ -38,7 +37,7 @@ export interface GrpcOptions { oneofs?: boolean; json?: boolean; includeDirs?: string[]; - } + }; }; } @@ -88,6 +87,6 @@ export interface RmqOptions { queue?: string; prefetchCount?: number; isGlobalPrefetchCount?: boolean; - queueOptions?: Options.AssertQueue; + queueOptions?: any; }; -} \ No newline at end of file +} diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 162411c6f..21fd62bfe 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -1,133 +1,174 @@ -import { Connection, Options } from 'amqplib'; -import { ERROR_EVENT, RQM_DEFAULT_URL, RQM_DEFAULT_QUEUE, RQM_DEFAULT_PREFETCH_COUNT, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, RQM_DEFAULT_QUEUE_OPTIONS, CONNECT_EVENT, DISCONNECT_EVENT } from './../constants'; import { Logger } from '@nestjs/common/services/logger.service'; import { loadPackage } from '@nestjs/common/utils/load-package.util'; -import { ClientProxy } from './client-proxy'; -import { ClientOptions, RmqOptions } from '../interfaces'; -import { WritePacket } from './../interfaces'; -import { EventEmitter } from 'events'; +import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; import * as amqp from 'amqp-connection-manager'; +import { EventEmitter } from 'events'; +import { fromEvent, merge, Observable } from 'rxjs'; +import { first, map, share, switchMap } from 'rxjs/operators'; +import { ClientOptions, RmqOptions } from '../interfaces'; +import { + DISCONNECT_EVENT, + ERROR_EVENT, + RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, + RQM_DEFAULT_PREFETCH_COUNT, + RQM_DEFAULT_QUEUE, + RQM_DEFAULT_QUEUE_OPTIONS, + RQM_DEFAULT_URL, +} from './../constants'; +import { WritePacket } from './../interfaces'; +import { ClientProxy } from './client-proxy'; let rqmPackage: any = {}; export class ClientRMQ extends ClientProxy { - private readonly logger = new Logger(ClientProxy.name); - private client: any = null; - private channel: any = null; - private urls: string[]; - private queue: string; - private prefetchCount: number; - private isGlobalPrefetchCount: boolean; - private queueOptions: Options.AssertQueue; - private replyQueue: string; - private responseEmitter: EventEmitter; + protected readonly logger = new Logger(ClientProxy.name); + protected connection: Promise; + protected client: any = null; + protected channel: any = null; + protected urls: string[]; + protected queue: string; + protected prefetchCount: number; + protected isGlobalPrefetchCount: boolean; + protected queueOptions: any; + protected replyQueue: string; + protected responseEmitter: EventEmitter; - constructor( - private readonly options: ClientOptions['options']) { - super(); - this.urls = - this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL]; - this.queue = - this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE; - this.prefetchCount = - this.getOptionsProp(this.options, 'prefetchCount') || RQM_DEFAULT_PREFETCH_COUNT; - this.isGlobalPrefetchCount = - this.getOptionsProp(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; - this.queueOptions = - this.getOptionsProp(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS; - rqmPackage = loadPackage('amqplib', ClientRMQ.name); - this.connect(); + constructor(protected readonly options: ClientOptions['options']) { + super(); + this.urls = this.getOptionsProp(this.options, 'urls') || [ + RQM_DEFAULT_URL, + ]; + this.queue = + this.getOptionsProp(this.options, 'queue') || + RQM_DEFAULT_QUEUE; + this.prefetchCount = + this.getOptionsProp(this.options, 'prefetchCount') || + RQM_DEFAULT_PREFETCH_COUNT; + this.isGlobalPrefetchCount = + this.getOptionsProp(this.options, 'isGlobalPrefetchCount') || + RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; + this.queueOptions = + this.getOptionsProp(this.options, 'queueOptions') || + RQM_DEFAULT_QUEUE_OPTIONS; + + rqmPackage = loadPackage('amqplib', ClientRMQ.name); + } + + public close(): void { + this.channel && this.channel.close(); + this.client && this.client.close(); + } + + public listen() { + this.channel.addSetup(channel => + channel.consume( + this.replyQueue, + msg => { + this.responseEmitter.emit(msg.properties.correlationId, msg); + }, + { noAck: true }, + ), + ); + } + + public connect(): Promise { + if (this.client && this.channel) { + return this.connection; } + this.client = this.createClient(); + this.handleError(this.client); - public close(): void { - this.channel && this.channel.close(); - this.client && this.client.close(); + const connect$ = this.connect$(this.client); + this.connection = this.mergeDisconnectEvent(this.client, connect$) + .pipe( + switchMap(() => { + return new Promise(resolve => + this.client.createChannel({ + json: false, + setup: async channel => this.setupChannel(channel, resolve), + }), + ); + }), + share(), + ) + .toPromise(); + return this.connection; + } + + public createClient(): T { + return amqp.connect(this.urls) as T; + } + + public mergeDisconnectEvent( + instance: any, + source$: Observable, + ): Observable { + const close$ = fromEvent(instance, DISCONNECT_EVENT).pipe( + map(err => { + throw err; + }), + ); + return merge(source$, close$).pipe(first()); + } + + public async setupChannel(channel: any, resolve: Function) { + await channel.assertQueue(this.queue, this.queueOptions); + await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount); + this.replyQueue = (await channel.assertQueue('', { + exclusive: true, + })).queue; + + this.responseEmitter = new EventEmitter(); + this.responseEmitter.setMaxListeners(0); + this.listen(); + + resolve(); + } + + protected publish( + message: any, + callback: (packet: WritePacket) => any, + ): Function { + try { + const correlationId = randomStringGenerator(); + const listener = msg => { + const { content } = msg; + this.handleMessage(content, callback); + }; + this.responseEmitter.on(correlationId, listener); + this.channel.sendToQueue( + this.queue, + Buffer.from(JSON.stringify(message)), + { + replyTo: this.replyQueue, + correlationId, + }, + ); + return () => this.responseEmitter.removeListener(correlationId, listener); + } catch (err) { + callback({ err }); } + } - public listen() { - this.channel.addSetup((channel) => { - return Promise.all([ - channel.consume(this.replyQueue, (msg) => { - this.responseEmitter.emit(msg.properties.correlationId, msg); - }, { noAck: true }), - ]); - }); + public handleMessage( + packet: WritePacket, + callback: (packet: WritePacket) => any, + ) { + const { err, response, isDisposed } = packet; + if (isDisposed || err) { + callback({ + err, + response: null, + isDisposed: true, + }); } + callback({ + err, + response, + }); + } - public connect(): Promise { - if (this.client && this.channel) { - return Promise.resolve(); - } - return new Promise(async (resolve, reject) => { - this.client = amqp.connect(this.urls); - this.client.on(CONNECT_EVENT, x => { - this.channel = this.client.createChannel({ - json: false, - setup: async (channel) => { - await channel.assertQueue(this.queue, this.queueOptions); - await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount); - this.replyQueue = (await channel.assertQueue('', { exclusive: true })).queue; - this.responseEmitter = new EventEmitter(); - this.responseEmitter.setMaxListeners(0); - this.listen(); - resolve(); - }, - }); - }); - this.client.on(DISCONNECT_EVENT, err => { - reject(err); - this.client.close(); - this.client = null; - }); - }); - } - - protected publish(messageObj, callback: (packet: WritePacket) => any) { - if (!this.client) { - this.connect().then(x => { - this.sendMessage(messageObj, callback); - }); - } else { - this.sendMessage(messageObj, callback); - } - } - - private sendMessage(messageObj, callback: (packet: WritePacket) => any) { - try { - const correlationId = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); - this.responseEmitter.on(correlationId, msg => { - const { content } = msg; - this.handleMessage(content, callback); - }); - this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(messageObj)), { - replyTo: this.replyQueue, - correlationId, - }); - } catch (err) { - this.logger.error(err); - callback({ err }); - } - } - - public handleMessage( - msg: WritePacket, - callback: (packet: WritePacket) => any, - ) { - const { err, response, isDisposed } = JSON.parse(msg.toString()); - if (isDisposed || err) { - callback({ - err, - response: null, - isDisposed: true, - }); - } - callback({ - err, - response, - }); - } - - public handleError(client: Connection): void { - client.addListener(ERROR_EVENT, err => this.logger.error(err)); - } -} \ No newline at end of file + public handleError(client: any): void { + client.addListener(ERROR_EVENT, err => this.logger.error(err)); + } +} diff --git a/packages/microservices/client/constants.ts b/packages/microservices/client/constants.ts index 95588f3df..d501c2276 100644 --- a/packages/microservices/client/constants.ts +++ b/packages/microservices/client/constants.ts @@ -1,3 +1,3 @@ export const ECONNREFUSED = 'ECONNREFUSED'; export const CONN_ERR = 'CONN_ERR'; -export const GRPC_CANCELLED = 'Cancelled'; \ No newline at end of file +export const GRPC_CANCELLED = 'Cancelled'; diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index d287f8426..9c2d5c306 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -1,7 +1,6 @@ import { MqttClientOptions } from '@nestjs/common/interfaces/external/mqtt-options.interface'; import { Transport } from '../enums/transport.enum'; import { Server } from './../server/server'; -import { Options } from 'amqplib'; import { CustomTransportStrategy } from './custom-transport-strategy.interface'; export type MicroserviceOptions = @@ -39,7 +38,7 @@ export interface GrpcOptions { oneofs?: boolean; json?: boolean; includeDirs?: string[]; - } + }; }; } @@ -89,6 +88,6 @@ export interface RmqOptions { queue?: string; prefetchCount?: number; isGlobalPrefetchCount?: boolean; - queueOptions?: Options.AssertQueue; + queueOptions?: any; }; -} \ No newline at end of file +} diff --git a/packages/microservices/server/server-rqm.ts b/packages/microservices/server/server-rqm.ts index 5376abfb0..fc1153774 100644 --- a/packages/microservices/server/server-rqm.ts +++ b/packages/microservices/server/server-rqm.ts @@ -1,81 +1,115 @@ -import { Server } from './server'; -import { Options } from 'amqplib'; -import { DISCONNECTED_RMQ_MESSAGE, DISCONNECT_EVENT, RQM_DEFAULT_URL, RQM_DEFAULT_QUEUE, RQM_DEFAULT_PREFETCH_COUNT, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, RQM_DEFAULT_QUEUE_OPTIONS, CONNECT_EVENT } from './../constants'; -import { CustomTransportStrategy, RmqOptions } from './../interfaces'; -import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface'; import { loadPackage } from '@nestjs/common/utils/load-package.util'; -import { Observable } from 'rxjs'; import * as amqp from 'amqp-connection-manager'; +import { Observable } from 'rxjs'; +import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface'; +import { + CONNECT_EVENT, + DISCONNECTED_RMQ_MESSAGE, + DISCONNECT_EVENT, + NO_PATTERN_MESSAGE, + RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, + RQM_DEFAULT_PREFETCH_COUNT, + RQM_DEFAULT_QUEUE, + RQM_DEFAULT_QUEUE_OPTIONS, + RQM_DEFAULT_URL, +} from './../constants'; +import { CustomTransportStrategy, RmqOptions } from './../interfaces'; +import { Server } from './server'; let rqmPackage: any = {}; export class ServerRMQ extends Server implements CustomTransportStrategy { - private server: any = null; - private channel: any = null; - private urls: string[]; - private queue: string; - private prefetchCount: number; - private queueOptions: Options.AssertQueue; - private isGlobalPrefetchCount: boolean; + private server: any = null; + private channel: any = null; + private urls: string[]; + private queue: string; + private prefetchCount: number; + private queueOptions: any; + private isGlobalPrefetchCount: boolean; - constructor(private readonly options: MicroserviceOptions) { - super(); - this.urls = - this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL]; - this.queue = - this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE; - this.prefetchCount = - this.getOptionsProp(this.options, 'prefetchCount') || RQM_DEFAULT_PREFETCH_COUNT; - this.isGlobalPrefetchCount = - this.getOptionsProp(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; - this.queueOptions = - this.getOptionsProp(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS; - rqmPackage = loadPackage('amqplib', ServerRMQ.name); + constructor(private readonly options: MicroserviceOptions) { + super(); + this.urls = this.getOptionsProp(this.options, 'urls') || [ + RQM_DEFAULT_URL, + ]; + this.queue = + this.getOptionsProp(this.options, 'queue') || + RQM_DEFAULT_QUEUE; + this.prefetchCount = + this.getOptionsProp(this.options, 'prefetchCount') || + RQM_DEFAULT_PREFETCH_COUNT; + this.isGlobalPrefetchCount = + this.getOptionsProp(this.options, 'isGlobalPrefetchCount') || + RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; + this.queueOptions = + this.getOptionsProp(this.options, 'queueOptions') || + RQM_DEFAULT_QUEUE_OPTIONS; + + rqmPackage = loadPackage('amqplib', ServerRMQ.name); + } + + public async listen(callback: () => void): Promise { + await this.start(callback); + } + + public close(): void { + this.channel && this.channel.close(); + this.server && this.server.close(); + } + + private async start(callback?: () => void) { + this.server = amqp.connect(this.urls); + this.server.on(CONNECT_EVENT, _ => { + this.channel = this.server.createChannel({ + json: false, + setup: async channel => { + await channel.assertQueue(this.queue, this.queueOptions); + await channel.prefetch( + this.prefetchCount, + this.isGlobalPrefetchCount, + ); + channel.consume(this.queue, msg => this.handleMessage(msg), { + noAck: true, + }); + callback(); + }, + }); + }); + + this.server.on(DISCONNECT_EVENT, err => { + this.logger.error(DISCONNECTED_RMQ_MESSAGE); + }); + } + + private async handleMessage(message: any): Promise { + const { content, properties } = message; + const packet = JSON.parse(content.toString()); + const pattern = JSON.stringify(packet.pattern); + const handler = this.getHandlerByPattern(pattern); + + if (!handler) { + const status = 'error'; + return this.sendMessage( + { status, err: NO_PATTERN_MESSAGE }, + properties.replyTo, + properties.correlationId, + ); } + const response$ = this.transformToObservable( + await handler(packet.data), + ) as Observable; + response$ && + this.send(response$, data => + this.sendMessage(data, properties.replyTo, properties.correlationId), + ); + } - public async listen(callback: () => void): Promise { - await this.start(callback); - } - - public close(): void { - this.channel && this.channel.close(); - this.server && this.server.close(); - } - - private async start(callback?: () => void) { - this.server = amqp.connect(this.urls); - this.server.on(CONNECT_EVENT, x => { - this.channel = this.server.createChannel({ - json: false, - setup: async (channel) => { - await channel.assertQueue(this.queue, this.queueOptions); - await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount); - channel.consume(this.queue, (msg) => this.handleMessage(msg), { noAck: true }); - callback(); - }, - }); - }); - - this.server.on(DISCONNECT_EVENT, err => { - this.logger.error(DISCONNECTED_RMQ_MESSAGE); - }); - } - - private async handleMessage(message): Promise { - const { content, properties } = message; - const messageObj = JSON.parse(content.toString()); - const handlers = this.getHandlers(); - const pattern = JSON.stringify(messageObj.pattern); - if (!this.messageHandlers[pattern]) { - return; - } - const handler = this.messageHandlers[pattern]; - const response$ = this.transformToObservable(await handler(messageObj.data)) as Observable; - response$ && this.send(response$, (data) => this.sendMessage(data, properties.replyTo, properties.correlationId)); - } - - private sendMessage(message, replyTo, correlationId): void { - const buffer = Buffer.from(JSON.stringify(message)); - this.channel.sendToQueue(replyTo, buffer, { correlationId }); - } + private sendMessage( + message: T, + replyTo: any, + correlationId: string, + ): void { + const buffer = Buffer.from(JSON.stringify(message)); + this.channel.sendToQueue(replyTo, buffer, { correlationId }); + } } diff --git a/packages/microservices/test/client/client-rmq.spec.ts b/packages/microservices/test/client/client-rmq.spec.ts index 228df55f9..0df781af2 100644 --- a/packages/microservices/test/client/client-rmq.spec.ts +++ b/packages/microservices/test/client/client-rmq.spec.ts @@ -1,145 +1,181 @@ -import * as sinon from 'sinon'; import { expect } from 'chai'; -import { ClientRMQ } from '../../client/client-rmq'; import { EventEmitter } from 'events'; +import { empty } from 'rxjs'; +import * as sinon from 'sinon'; +import { ClientRMQ } from '../../client/client-rmq'; +// tslint:disable:no-string-literal describe('ClientRQM', () => { - const test = 'test'; - const client = new ClientRMQ({}); + const client = new ClientRMQ({}); - describe('connect', () => { - let createChannelSpy: sinon.SinonSpy, - assertQueueSpy: sinon.SinonSpy, - listenSpy: sinon.SinonSpy; + describe('connect', () => { + let createClientStub: sinon.SinonStub; + let handleErrorsSpy: sinon.SinonSpy; + let connect$Stub: sinon.SinonStub; + let mergeDisconnectEvent: sinon.SinonStub; - beforeEach( async () => { - (client as any).client = { - createChannel: createChannelSpy, - }; - (client as any).channel = { - assertQueue: assertQueueSpy - }; - (client as any).listen = listenSpy; - }); + beforeEach(async () => { + createClientStub = sinon.stub(client, 'createClient').callsFake(() => ({ + addListener: () => ({}), + removeListener: () => ({}), + })); + handleErrorsSpy = sinon.spy(client, 'handleError'); + connect$Stub = sinon.stub(client, 'connect$').callsFake(() => ({ + subscribe: resolve => resolve(), + toPromise() { + return this; + }, + pipe() { + return this; + }, + })); + mergeDisconnectEvent = sinon + .stub(client, 'mergeDisconnectEvent') + .callsFake((_, source) => source); + }); + afterEach(() => { + createClientStub.restore(); + handleErrorsSpy.restore(); + connect$Stub.restore(); + mergeDisconnectEvent.restore(); + }); + describe('when is not connected', () => { + beforeEach(async () => { + client['mqttClient'] = null; + await client.connect(); + }); + it('should call "handleError" once', async () => { + expect(handleErrorsSpy.called).to.be.true; + }); + it('should call "createClient" once', async () => { + expect(createClientStub.called).to.be.true; + }); + it('should call "connect$" once', async () => { + expect(connect$Stub.called).to.be.true; + }); + }); + describe('when is connected', () => { + beforeEach(() => { + client['client'] = { test: true } as any; + }); + it('should not call "createClient"', () => { + expect(createClientStub.called).to.be.false; + }); + it('should not call "handleError"', () => { + expect(handleErrorsSpy.called).to.be.false; + }); + it('should not call "connect$"', () => { + expect(connect$Stub.called).to.be.false; + }); + }); + }); - it('should create channel on connect()', () => { - client['connect']().then(() => { - expect(createChannelSpy.called).to.be.true; - }); - }); + describe('mergeDisconnectEvent', () => { + it('should merge disconnect event', () => { + const error = new Error(); + const instance: any = { + on: (ev, callback) => callback(error), + off: () => ({}), + }; + client + .mergeDisconnectEvent(instance as any, empty()) + .subscribe(null, err => expect(err).to.be.eql(error)); + }); + }); - it('should create assert queues on connect()', () => { - client['connect']().then(() => { - expect(assertQueueSpy.called).to.be.true; - }); - }); + describe('publish', () => { + const pattern = 'test'; + const msg = { pattern, data: 'data' }; + let connectSpy: sinon.SinonSpy, + sendToQueueSpy: sinon.SinonSpy, + eventSpy: sinon.SinonSpy; - it('should call listen() on connect()', () => { - client['connect']().then(() => { - expect(listenSpy.called).to.be.true; - }); - }); + beforeEach(() => { + connectSpy = sinon.spy(client, 'connect'); + eventSpy = sinon.spy(); + sendToQueueSpy = sinon.spy(); + + (client as any).client = {}; + (client as any).channel = { + sendToQueue: sendToQueueSpy, + }; + (client as any).responseEmitter = new EventEmitter(); + (client as any).responseEmitter.on('test', eventSpy); }); - describe('publish', () => { - const pattern = 'test'; - const msg = { pattern, data: 'data' }; - let connectSpy: sinon.SinonSpy, - sendToQueueSpy: sinon.SinonSpy, - eventSpy: sinon.SinonSpy; - - beforeEach(() => { - connectSpy = sinon.spy(client, 'connect'); - eventSpy = sinon.spy(); - - (client as any).client = {}; - (client as any).channel = { - sendToQueue: sendToQueueSpy - }; - (client as any).responseEmitter = new EventEmitter(); - (client as any).responseEmitter.on('test', eventSpy) - }); - - afterEach(() => { - connectSpy.restore(); - }); - - it('should not call "connect()" when client not null', () => { - client['publish'](msg, () => {}); - expect(connectSpy.called).to.be.false; - }); - - it('should call "connect()" when client is null', () => { - (client as any).client = null; - client['publish'](msg, () => {}); - expect(connectSpy.called).to.be.true; - }); - - it('should invoke callback on event', () => { - (client as any).client = null; - client['publish'](msg, () => { - (client as any).responseEmitter.emit('test'); - }); - expect(eventSpy.called).to.be.true; - }); - - it('should send message', () => { - client['publish'](msg, () => { - expect(sendToQueueSpy.called).to.be.true; - }); - }); + afterEach(() => { + connectSpy.restore(); }); - describe('handleMessage', () => { - const msg = {}; - let callbackSpy: sinon.SinonSpy; - let deleteQueueSpy: sinon.SinonSpy; - let callback = (data) => {}; + it('should send message', () => { + client['publish'](msg, () => { + expect(sendToQueueSpy.called).to.be.true; + }); + }); + }); - beforeEach(() => { - callbackSpy = sinon.spy(); - deleteQueueSpy = sinon.spy(); - (client as any).channel = { deleteQueue: deleteQueueSpy }; - callback = callbackSpy; - }); + describe('handleMessage', () => { + const msg: any = {}; + let callbackSpy: sinon.SinonSpy; + let deleteQueueSpy: sinon.SinonSpy; + let callback = data => {}; - it('should callback if no error or isDisposed', () => { - msg.content = JSON.stringify({ err: null, response: 'test', isDisposed: false }); - client['handleMessage'](msg, callback); - expect(callbackSpy.called).to.be.true; - }); - - it('should callback if error', () => { - msg.content = JSON.stringify({ err: true, response: 'test', isDisposed: false }); - client['handleMessage'](msg, callback); - expect(callbackSpy.called).to.be.true; - }); - - it('should callback if isDisposed', () => { - msg.content = JSON.stringify({ err: null, response: 'test', isDisposed: true }); - client['handleMessage'](msg, callback); - expect(callbackSpy.called).to.be.true; - }); + beforeEach(() => { + callbackSpy = sinon.spy(); + deleteQueueSpy = sinon.spy(); + (client as any).channel = { deleteQueue: deleteQueueSpy }; + callback = callbackSpy; }); - describe('close', () => { - let channelCloseSpy: sinon.SinonSpy; - let clientCloseSpy: sinon.SinonSpy; - beforeEach(() => { - channelCloseSpy = sinon.spy(); - clientCloseSpy = sinon.spy(); - (client as any).channel = { close: channelCloseSpy }; - (client as any).client = { close: clientCloseSpy }; - }); - - it('should close channel when it is not null', () => { - client.close(); - expect(channelCloseSpy.called).to.be.true; - }); - - it('should close client when it is not null', () => { - client.close(); - expect(clientCloseSpy.called).to.be.true; - }); + it('should callback if no error or isDisposed', () => { + msg.content = JSON.stringify({ + err: null, + response: 'test', + isDisposed: false, + }); + client.handleMessage(msg, callback); + expect(callbackSpy.called).to.be.true; }); + + it('should callback if error', () => { + msg.content = JSON.stringify({ + err: true, + response: 'test', + isDisposed: false, + }); + client.handleMessage(msg, callback); + expect(callbackSpy.called).to.be.true; + }); + + it('should callback if isDisposed', () => { + msg.content = JSON.stringify({ + err: null, + response: 'test', + isDisposed: true, + }); + client.handleMessage(msg, callback); + expect(callbackSpy.called).to.be.true; + }); + }); + + describe('close', () => { + let channelCloseSpy: sinon.SinonSpy; + let clientCloseSpy: sinon.SinonSpy; + beforeEach(() => { + channelCloseSpy = sinon.spy(); + clientCloseSpy = sinon.spy(); + (client as any).channel = { close: channelCloseSpy }; + (client as any).client = { close: clientCloseSpy }; + }); + + it('should close channel when it is not null', () => { + client.close(); + expect(channelCloseSpy.called).to.be.true; + }); + + it('should close client when it is not null', () => { + client.close(); + expect(clientCloseSpy.called).to.be.true; + }); + }); });