bugfix(microservices) fix rabbit mq transport, code refactor, add tests

This commit is contained in:
Kamil Myśliwiec
2018-10-05 22:42:35 +02:00
parent 91be484b83
commit 096336c243
13 changed files with 459 additions and 188 deletions

View File

@@ -36,4 +36,11 @@ services:
environment:
- MONGODB_DATABASE="test"
ports:
- 27017:27017
- 27017:27017
rabbit:
hostname: rabbit
image: "rabbitmq:management"
ports:
- "15672:15672"
- "5672:5672"
tty: true

View File

@@ -1,49 +0,0 @@
import * as express from 'express';
import * as request from 'supertest';
import { Test } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { RMQController } from '../src/rmq/rmq.controller';
import { RMQBroadcastController } from '../src/rmq/rmq-broadcast.controller';
describe('RabbitMQ transport', () => {
let server;
let app: INestApplication;
beforeEach(async () => {
const module = await Test.createTestingModule({
controllers: [RMQBroadcastController],
}).compile();
server = express();
app = module.createNestApplication(server);
app.connectMicroservice({
transport: Transport.RMQ,
options: {
urls: [`amqp://admin:admin@localhost`],
queue: 'test',
queueOptions: { durable: false },
},
});
app.connectMicroservice({
transport: Transport.RMQ,
options: {
urls: [`amqp://admin:admin@localhost`],
queue: 'test',
queueOptions: { durable: false },
},
});
await app.startAllMicroservicesAsync();
await app.init();
});
it(`Broadcast (2 subscribers)`, () => {
return request(server)
.get('/broadcast')
.expect(200, '2');
});
afterEach(async () => {
await app.close();
});
});

View File

@@ -1,8 +1,8 @@
import * as express from 'express';
import * as request from 'supertest';
import { Test } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import * as express from 'express';
import * as request from 'supertest';
import { RMQController } from '../src/rmq/rmq.controller';
describe('RabbimMQ transport', () => {
@@ -17,16 +17,16 @@ describe('RabbimMQ transport', () => {
server = express();
app = module.createNestApplication(server);
app.connectMicroservice({
transport: Transport.RMQ,
options: {
urls: [`amqp://admin:admin@localhost`],
queue: 'test',
queueOptions: { durable: false },
},
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'test',
queueOptions: { durable: false },
},
});
await app.startAllMicroservicesAsync();
await app.init();
});
});
it(`/POST`, () => {
return request(server)
@@ -54,11 +54,16 @@ describe('RabbimMQ transport', () => {
return request(server)
.post('/concurrent')
.send([
[1, 2, 3, 4, 5],
[6, 7, 8, 9, 10],
[11, 12, 13, 14, 15],
[16, 17, 18, 19, 20],
[21, 22, 23, 24, 25],
Array.from({ length: 10 }, (v, k) => k + 1),
Array.from({ length: 10 }, (v, k) => k + 11),
Array.from({ length: 10 }, (v, k) => k + 21),
Array.from({ length: 10 }, (v, k) => k + 31),
Array.from({ length: 10 }, (v, k) => k + 41),
Array.from({ length: 10 }, (v, k) => k + 51),
Array.from({ length: 10 }, (v, k) => k + 61),
Array.from({ length: 10 }, (v, k) => k + 71),
Array.from({ length: 10 }, (v, k) => k + 81),
Array.from({ length: 10 }, (v, k) => k + 91),
])
.expect(200, 'true');
});

View File

@@ -7,27 +7,25 @@ import {
} from '@nestjs/common';
import { ClientProxyFactory } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
import { catchError, tap } from 'rxjs/operators';
import { catchError } from 'rxjs/operators';
@Controller()
export class DisconnectedClientController {
@Post()
call(@Body() options): Observable<number> {
const client = ClientProxyFactory.create(options);
return client
.send<number, number[]>({ cmd: 'none' }, [1, 2, 3])
.pipe(
tap(
return client.send<number, number[]>({ cmd: 'none' }, [1, 2, 3]).pipe(
/*tap(
console.log.bind(console, 'data'),
console.error.bind(console, 'error'),
),*/
catchError(({ code }) =>
throwError(
code === 'ECONNREFUSED' || code === 'CONN_ERR'
? new RequestTimeoutException('ECONNREFUSED')
: new InternalServerErrorException(),
),
catchError(({ code }) =>
throwError(
code === 'ECONNREFUSED' || code === 'CONN_ERR'
? new RequestTimeoutException('ECONNREFUSED')
: new InternalServerErrorException(),
),
),
);
),
);
}
}

View File

@@ -1,10 +1,9 @@
import { Controller, Get } from '@nestjs/common';
import {
Client,
MessagePattern,
ClientProxy,
Transport,
ClientProxyFactory,
MessagePattern,
Transport,
} from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { scan, take } from 'rxjs/operators';
@@ -15,13 +14,13 @@ export class RMQBroadcastController {
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://admin:admin@localhost`],
queue: 'test',
queueOptions: { durable: false },
},
});
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'test_broadcast',
queueOptions: { durable: false },
},
});
}
@Get('broadcast')

View File

@@ -1,12 +1,11 @@
import { Controller, Get, Post, Body, Query, HttpCode } from '@nestjs/common';
import { Body, Controller, HttpCode, Post, Query } from '@nestjs/common';
import {
Client,
MessagePattern,
ClientProxy,
Transport,
ClientProxyFactory,
MessagePattern,
Transport,
} from '@nestjs/microservices';
import { Observable, of, from } from 'rxjs';
import { from, Observable, of } from 'rxjs';
import { scan } from 'rxjs/operators';
@Controller()
@@ -15,13 +14,13 @@ export class RMQController {
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://admin:admin@localhost`],
queue: 'test',
queueOptions: { durable: false },
},
});
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'test',
queueOptions: { durable: false },
},
});
}
@Post()

View File

@@ -4,5 +4,5 @@ export enum Transport {
NATS,
MQTT,
GRPC,
RMQ
}
RMQ,
}

View File

@@ -59,20 +59,18 @@ export class ClientRMQ extends ClientProxy {
this.client && this.client.close();
}
public listen() {
public consumeChannel() {
this.channel.addSetup(channel =>
channel.consume(
this.replyQueue,
msg => {
this.responseEmitter.emit(msg.properties.correlationId, msg);
},
msg => this.responseEmitter.emit(msg.properties.correlationId, msg),
{ noAck: true },
),
);
}
public connect(): Promise<any> {
if (this.client && this.channel) {
if (this.client) {
return this.connection;
}
this.client = this.createClient();
@@ -80,21 +78,20 @@ export class ClientRMQ extends ClientProxy {
const connect$ = this.connect$(this.client);
this.connection = this.mergeDisconnectEvent(this.client, connect$)
.pipe(
switchMap(() => {
return new Promise(resolve =>
this.client.createChannel({
json: false,
setup: async channel => this.setupChannel(channel, resolve),
}),
);
}),
share(),
)
.pipe(switchMap(() => this.createChannel()), share())
.toPromise();
return this.connection;
}
public createChannel(): Promise<void> {
return new Promise(resolve => {
this.channel = this.client.createChannel({
json: false,
setup: channel => this.setupChannel(channel, resolve),
});
});
}
public createClient<T = any>(): T {
return amqp.connect(this.urls) as T;
}
@@ -114,13 +111,14 @@ export class ClientRMQ extends ClientProxy {
public async setupChannel(channel: any, resolve: Function) {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
this.replyQueue = (await channel.assertQueue('', {
exclusive: true,
})).queue;
this.responseEmitter = new EventEmitter();
this.responseEmitter.setMaxListeners(0);
this.listen();
this.consumeChannel();
resolve();
}
@@ -131,10 +129,9 @@ export class ClientRMQ extends ClientProxy {
): Function {
try {
const correlationId = randomStringGenerator();
const listener = msg => {
const { content } = msg;
this.handleMessage(content, callback);
};
const listener = ({ content }) =>
this.handleMessage(JSON.parse(content.toString()), callback);
this.responseEmitter.on(correlationId, listener);
this.channel.sendToQueue(
this.queue,

View File

@@ -6,7 +6,7 @@ import { ServerMqtt } from './server-mqtt';
import { ServerNats } from './server-nats';
import { ServerRedis } from './server-redis';
import { ServerTCP } from './server-tcp';
import { ServerRMQ } from './server-rqm';
import { ServerRMQ } from './server-rmq';
export class ServerFactory {
public static create(

View File

@@ -1,7 +1,6 @@
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import * as amqp from 'amqp-connection-manager';
import { Observable } from 'rxjs';
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
import {
CONNECT_EVENT,
DISCONNECTED_RMQ_MESSAGE,
@@ -12,8 +11,9 @@ import {
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
} from './../constants';
import { CustomTransportStrategy, RmqOptions } from './../interfaces';
} from '../constants';
import { CustomTransportStrategy, RmqOptions } from '../interfaces';
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
let rqmPackage: any = {};
@@ -57,31 +57,33 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
this.server && this.server.close();
}
private async start(callback?: () => void) {
this.server = amqp.connect(this.urls);
public async start(callback?: () => void) {
this.server = this.createClient();
this.server.on(CONNECT_EVENT, _ => {
this.channel = this.server.createChannel({
json: false,
setup: async channel => {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(
this.prefetchCount,
this.isGlobalPrefetchCount,
);
channel.consume(this.queue, msg => this.handleMessage(msg), {
noAck: true,
});
callback();
},
setup: channel => this.setupChannel(channel, callback),
});
});
this.server.on(DISCONNECT_EVENT, err => {
this.logger.error(DISCONNECTED_RMQ_MESSAGE);
});
}
private async handleMessage(message: any): Promise<void> {
public createClient<T = any>(): T {
return amqp.connect(this.urls);
}
public async setupChannel(channel: any, callback: Function) {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
channel.consume(this.queue, msg => this.handleMessage(msg), {
noAck: true,
});
callback();
}
public async handleMessage(message: any): Promise<void> {
const { content, properties } = message;
const packet = JSON.parse(content.toString());
const pattern = JSON.stringify(packet.pattern);
@@ -98,13 +100,14 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ &&
this.send(response$, data =>
this.sendMessage(data, properties.replyTo, properties.correlationId),
);
const publish = data =>
this.sendMessage(data, properties.replyTo, properties.correlationId);
response$ && this.send(response$, publish);
}
private sendMessage<T = any>(
public sendMessage<T = any>(
message: T,
replyTo: any,
correlationId: string,

View File

@@ -41,7 +41,7 @@ describe('ClientRQM', () => {
});
describe('when is not connected', () => {
beforeEach(async () => {
client['mqttClient'] = null;
client['client'] = null;
await client.connect();
});
it('should call "handleError" once', async () => {
@@ -57,6 +57,7 @@ describe('ClientRQM', () => {
describe('when is connected', () => {
beforeEach(() => {
client['client'] = { test: true } as any;
client['channel'] = { test: true };
});
it('should not call "createClient"', () => {
expect(createClientStub.called).to.be.false;
@@ -70,6 +71,99 @@ describe('ClientRQM', () => {
});
});
describe('createChannel', () => {
let createChannelStub: sinon.SinonStub;
let setupChannelStub: sinon.SinonStub;
beforeEach(() => {
setupChannelStub = sinon
.stub(client, 'setupChannel')
.callsFake((_, done) => done());
createChannelStub = sinon.stub().callsFake(({ setup }) => setup());
client['client'] = { createChannel: createChannelStub };
});
afterEach(() => {
setupChannelStub.restore();
});
it('should call "createChannel" method of the client instance', async () => {
await client.createChannel();
expect(createChannelStub.called).to.be.true;
});
it('should call "setupChannel" method of the client instance', async () => {
await client.createChannel();
expect(setupChannelStub.called).to.be.true;
});
});
describe('consumeChannel', () => {
let addSetupStub: sinon.SinonStub;
let consumeStub: sinon.SinonStub;
const channel: any = {};
beforeEach(() => {
client['responseEmitter'] = new EventEmitter();
consumeStub = sinon
.stub()
.callsFake((_, done) => done({ properties: { correlationId: 1 } }));
addSetupStub = sinon.stub().callsFake(fn => fn(channel));
channel.consume = consumeStub;
client['channel'] = { addSetup: addSetupStub };
});
it('should call "addSetup" method of the channel instance', async () => {
await client.consumeChannel();
expect(addSetupStub.called).to.be.true;
});
it('should call "consume" method of the channel instance', async () => {
await client.consumeChannel();
expect(consumeStub.called).to.be.true;
});
});
describe('setupChannel', () => {
const queue = 'test';
const queueOptions = {};
const isGlobalPrefetchCount = true;
const prefetchCount = 10;
let consumeStub: sinon.SinonStub;
let channel: any = {};
beforeEach(() => {
client['queue'] = queue;
client['queueOptions'] = queueOptions;
client['isGlobalPrefetchCount'] = isGlobalPrefetchCount;
client['prefetchCount'] = prefetchCount;
channel = {
assertQueue: sinon.spy(() => ({})),
prefetch: sinon.spy(),
};
consumeStub = sinon.stub(client, 'consumeChannel').callsFake(() => null);
});
afterEach(() => {
consumeStub.restore();
});
it('should call "assertQueue" with queue and queue options', async () => {
await client.setupChannel(channel, () => null);
expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true;
});
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
await client.setupChannel(channel, () => null);
expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))
.to.be.true;
});
it('should call "consumeChannel" method', async () => {
await client.setupChannel(channel, () => null);
expect(consumeStub.called).to.be.true;
});
it('should call "resolve" function', async () => {
const resolve = sinon.spy();
await client.setupChannel(channel, resolve);
expect(resolve.called).to.be.true;
});
});
describe('mergeDisconnectEvent', () => {
it('should merge disconnect event', () => {
const error = new Error();
@@ -95,66 +189,116 @@ describe('ClientRQM', () => {
eventSpy = sinon.spy();
sendToQueueSpy = sinon.spy();
(client as any).client = {};
(client as any).channel = {
client['channel'] = {
sendToQueue: sendToQueueSpy,
};
(client as any).responseEmitter = new EventEmitter();
(client as any).responseEmitter.on('test', eventSpy);
client['responseEmitter'] = new EventEmitter();
client['responseEmitter'].on(pattern, eventSpy);
});
afterEach(() => {
connectSpy.restore();
});
it('should send message', () => {
it('should send message to a proper queue', () => {
client['publish'](msg, () => {
expect(sendToQueueSpy.called).to.be.true;
expect(sendToQueueSpy.getCall(0).args[0]).to.be.eql(client['queue']);
});
});
it('should send buffer from stringified message', () => {
client['publish'](msg, () => {
expect(sendToQueueSpy.called).to.be.true;
expect(sendToQueueSpy.getCall(1).args[1]).to.be.eql(
Buffer.from(JSON.stringify(msg)),
);
});
});
describe('dispose callback', () => {
let unsubscribeSpy: sinon.SinonSpy, subscription;
beforeEach(async () => {
unsubscribeSpy = sinon.spy();
client['responseEmitter'] = ({
removeListener: unsubscribeSpy,
on: sinon.spy(),
} as any) as EventEmitter;
subscription = await client['publish'](msg, sinon.spy());
subscription();
});
it('should unsubscribe', () => {
expect(unsubscribeSpy.called).to.be.true;
});
});
});
describe('handleMessage', () => {
const msg: any = {};
let callbackSpy: sinon.SinonSpy;
let deleteQueueSpy: sinon.SinonSpy;
let callback = data => {};
describe('when error', () => {
let callback: sinon.SinonSpy;
beforeEach(() => {
callbackSpy = sinon.spy();
deleteQueueSpy = sinon.spy();
(client as any).channel = { deleteQueue: deleteQueueSpy };
callback = callbackSpy;
beforeEach(() => {
callback = sinon.spy();
});
it('should call callback with correct object', () => {
const packet = {
err: true,
response: 'test',
isDisposed: false,
};
client.handleMessage(packet, callback);
expect(
callback.calledWith({
err: packet.err,
response: null,
isDisposed: true,
}),
).to.be.true;
});
});
describe('when disposed', () => {
let callback: sinon.SinonSpy;
beforeEach(() => {
callback = sinon.spy();
});
it('should call callback with correct object', () => {
const packet = {
response: 'test',
isDisposed: true,
};
client.handleMessage(packet, callback);
expect(
callback.calledWith({
err: undefined,
response: null,
isDisposed: true,
}),
).to.be.true;
});
});
it('should callback if no error or isDisposed', () => {
msg.content = JSON.stringify({
err: null,
response: 'test',
isDisposed: false,
});
client.handleMessage(msg, callback);
expect(callbackSpy.called).to.be.true;
});
describe('when response', () => {
let callback: sinon.SinonSpy;
it('should callback if error', () => {
msg.content = JSON.stringify({
err: true,
response: 'test',
isDisposed: false,
beforeEach(() => {
callback = sinon.spy();
});
client.handleMessage(msg, callback);
expect(callbackSpy.called).to.be.true;
});
it('should callback if isDisposed', () => {
msg.content = JSON.stringify({
err: null,
response: 'test',
isDisposed: true,
it('should call callback with correct object', () => {
const packet = {
response: 'test',
isDisposed: false,
};
client.handleMessage(packet, callback);
expect(
callback.calledWith({
err: undefined,
response: packet.response,
}),
).to.be.true;
});
client.handleMessage(msg, callback);
expect(callbackSpy.called).to.be.true;
});
});

View File

@@ -0,0 +1,168 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { NO_PATTERN_MESSAGE } from '../../constants';
import { ServerRMQ } from '../../server/server-rmq';
// tslint:disable:no-string-literal
describe('ServerRMQ', () => {
let server: ServerRMQ;
beforeEach(() => {
server = new ServerRMQ({});
});
describe('listen', () => {
let createClient: sinon.SinonStub;
let onStub: sinon.SinonStub;
let createChannelStub: sinon.SinonStub;
let setupChannelStub: sinon.SinonStub;
let client: any;
beforeEach(() => {
onStub = sinon
.stub()
.callsFake((event, callback) => event === 'connect' && callback());
createChannelStub = sinon.stub().callsFake(({ setup }) => setup());
setupChannelStub = sinon
.stub(server, 'setupChannel')
.callsFake(() => ({}));
client = {
on: onStub,
createChannel: createChannelStub,
};
createClient = sinon.stub(server, 'createClient').callsFake(() => client);
server.listen(null);
});
afterEach(() => {
setupChannelStub.restore();
});
it('should call "createClient"', () => {
expect(createClient.called).to.be.true;
});
it('should bind "connect" event to handler', () => {
expect(onStub.getCall(0).args[0]).to.be.equal('connect');
});
it('should bind "disconnect" event to handler', () => {
expect(onStub.getCall(1).args[0]).to.be.equal('disconnect');
});
});
describe('close', () => {
const rmqServer = { close: sinon.spy() };
const rmqChannel = { close: sinon.spy() };
beforeEach(() => {
(server as any).server = rmqServer;
(server as any).channel = rmqChannel;
});
it('should close server', () => {
server.close();
expect(rmqServer.close.called).to.be.true;
});
it('should close channel', () => {
server.close();
expect(rmqChannel.close.called).to.be.true;
});
});
describe('handleMessage', () => {
const pattern = 'test';
const msg = {
content: {
toString: () =>
JSON.stringify({
pattern,
data: 'tests',
id: '3',
}),
},
properties: { correlationId: 1 },
};
let sendMessageStub: sinon.SinonStub;
beforeEach(() => {
sendMessageStub = sinon.stub(server, 'sendMessage').callsFake(() => ({}));
});
it('should send NO_PATTERN_MESSAGE error if key does not exists in handlers object', async () => {
await server.handleMessage(msg);
expect(
sendMessageStub.calledWith({
status: 'error',
err: NO_PATTERN_MESSAGE,
}),
).to.be.true;
});
it('should call handler if exists in handlers object', async () => {
const handler = sinon.spy();
(server as any).messageHandlers = {
[JSON.stringify(pattern)]: handler as any,
};
await server.handleMessage(msg);
expect(handler.calledOnce).to.be.true;
});
});
describe('setupChannel', () => {
const queue = 'test';
const queueOptions = {};
const isGlobalPrefetchCount = true;
const prefetchCount = 10;
let channel: any = {};
beforeEach(() => {
server['queue'] = queue;
server['queueOptions'] = queueOptions;
server['isGlobalPrefetchCount'] = isGlobalPrefetchCount;
server['prefetchCount'] = prefetchCount;
channel = {
assertQueue: sinon.spy(() => ({})),
prefetch: sinon.spy(),
consume: sinon.spy(),
};
});
it('should call "assertQueue" with queue and queue options', async () => {
await server.setupChannel(channel, () => null);
expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true;
});
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
await server.setupChannel(channel, () => null);
expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))
.to.be.true;
});
it('should call "consumeChannel" method', async () => {
await server.setupChannel(channel, () => null);
expect(channel.consume.called).to.be.true;
});
it('should call "resolve" function', async () => {
const resolve = sinon.spy();
await server.setupChannel(channel, resolve);
expect(resolve.called).to.be.true;
});
});
describe('sendMessage', () => {
let channel: any;
beforeEach(() => {
channel = {
sendToQueue: sinon.spy(),
};
server['channel'] = channel;
});
it('should publish message to indicated queue', () => {
const message = { test: true };
const replyTo = 'test';
const correlationId = '0';
server.sendMessage(message, replyTo, correlationId);
expect(
channel.sendToQueue.calledWith(
Buffer.from(JSON.stringify(message)),
replyTo,
{ correlationId },
),
);
});
});
});

View File

@@ -56,7 +56,7 @@ describe('ServerTCP', () => {
sendMessage: sinon.spy(),
};
});
it('should send NO_PATTERN_MESSAGE error if key is not exists in handlers object', () => {
it('should send NO_PATTERN_MESSAGE error if key does not exists in handlers object', () => {
server.handleMessage(socket, msg);
expect(
socket.sendMessage.calledWith({