test for Messaging and Kafka Serializer

This commit is contained in:
Hexenon
2019-08-21 19:31:00 -06:00
parent 7cf4325472
commit dc020161b3
2 changed files with 83 additions and 46 deletions

View File

@@ -11,28 +11,12 @@ import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as request from 'supertest';
import { KafkaController } from '../src/kafka/kafka.controller';
import { APP_FILTER } from '@nestjs/core';
import { Observable, throwError } from 'rxjs';
import { KafkaMessagesController } from '../src/kafka/kafka.messages.controller';
import { UserDto } from '../src/kafka/dtos/user.dto';
import { UserEntity } from '../src/kafka/entities/user.entity';
import { BusinessDto } from '../src/kafka/dtos/business.dto';
import { BusinessEntity } from '../src/kafka/entities/business.entity';
import * as async from 'async';
@Catch()
class KafkaExceptionFilter implements ExceptionFilter {
catch(exception: HttpException, host: ArgumentsHost): any {
throw exception;
}
}
@Catch()
class RpcErrorFilter implements RpcExceptionFilter {
catch(exception: RpcException): Observable<any> {
return throwError(exception);
}
}
describe('Kafka transport', () => {
let server;
let app: INestApplication;
@@ -43,16 +27,6 @@ describe('Kafka transport', () => {
KafkaController,
KafkaMessagesController,
],
providers: [
{
provide: APP_FILTER,
useClass: RpcErrorFilter,
},
{
provide: APP_FILTER,
useClass: KafkaExceptionFilter,
},
],
}).compile();
app = module.createNestApplication();

View File

@@ -2,6 +2,12 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { ServerKafka } from '../../server';
import { Logger } from '@nestjs/common';
import { MessageHandler } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { Consumer } from 'kafka-node';
import { KafkaSerializer } from '../../helpers';
import { EachMessagePayload, KafkaMessage } from '@nestjs/microservices/external/kafka.interface';
import { KafkaHeaders } from '../../enums';
class NoopLogger extends Logger {
log(message: any, context?: string): void {}
@@ -12,19 +18,6 @@ class NoopLogger extends Logger {
describe('ServerKafka', () => {
let server: ServerKafka;
beforeEach(() => {
server = new ServerKafka({});
});
describe('close', () => {
it('should close server', () => {
server.close();
expect(server.consumer).to.be.null;
expect(server.producer).to.be.null;
expect(server.client).to.be.null;
});
});
let callback: sinon.SinonSpy;
let bindEventsStub: sinon.SinonStub;
let connect: sinon.SinonSpy;
@@ -34,13 +27,12 @@ describe('ServerKafka', () => {
let producerStub: sinon.SinonStub;
let client;
beforeEach(() => {
server = new ServerKafka({});
callback = sinon.spy();
connect = sinon.spy();
subscribe = sinon.spy();
run = sinon.spy();
bindEventsStub = sinon
.stub(server, 'bindEvents')
.callsFake(() => ({} as any));
consumerStub = sinon.stub(server, 'consumer')
.callsFake( () => {
return {
@@ -62,13 +54,49 @@ describe('ServerKafka', () => {
sinon.stub(server, 'createClient').callsFake(() => client);
});
const messageHandler: MessageHandler = (data): Promise<Observable<any>> => {
Logger.log('something happened');
const ob = new Observable();
ob.subscribe(obResult => {
Logger.log('something happened again');
});
return Promise.resolve(ob);
};
const messageValue = Buffer.from('test-message');
const msg: KafkaMessage = {
key: Buffer.from('test.key'),
offset: '0',
size: messageValue.length,
value: messageValue,
timestamp: Date.now().toString(),
attributes: 1,
headers: {
[KafkaHeaders.CORRELATION_ID]: Buffer.from('test-id'),
[KafkaHeaders.REPLY_TOPIC]: Buffer.from('test.key.reply'),
[KafkaHeaders.REPLY_PARTITION]: Buffer.from('1'),
}
};
describe('close', () => {
it('should close server', () => {
server.close();
expect(server.consumer).to.be.null;
expect(server.producer).to.be.null;
expect(server.client).to.be.null;
});
});
describe('listen', () => {
it('should call "bindEvents"', async () => {
bindEventsStub = sinon
.stub(server, 'bindEvents')
.callsFake(() => ({} as any));
await server.listen(callback);
expect(bindEventsStub.called).to.be.true;
});
it('should call "client.start"', async () => {
await server.listen(callback);
expect(client.producer.called).to.be.true;
});
@@ -81,10 +109,45 @@ describe('ServerKafka', () => {
describe('bindEvents', () => {
it('should not call subscribe nor run on consumer when there are no messageHandlers', async () => {
(server as any).logger = new NoopLogger();
await server.listen(callback);
await server.bindEvents(server.consumer);
expect(subscribe.called).to.be.not.true;
expect(run.called).to.be.not.true;
expect(connect.called).to.be.not.true;
expect(subscribe.called).to.be.false;
expect(run.called).to.be.true;
expect(connect.called).to.be.true;
});
it('should call subscribe and run on consumer when there are messageHandlers', async () => {
(server as any).logger = new NoopLogger();
await server.listen(callback);
await server.bindEvents(server.consumer);
server.addHandler('example.pattern', messageHandler, false);
await server.bindEvents(server.consumer);
expect(subscribe.called).to.be.true;
expect(run.called).to.be.true;
expect(connect.called).to.be.true;
});
});
describe('Serializer', () => {
it('should serialize and deserialize the payload', async () => {
const serializedMsg = KafkaSerializer.serialize<KafkaMessage>(msg);
const unserializedMsg = KafkaSerializer.deserialize<KafkaMessage>(serializedMsg);
expect(unserializedMsg).to.be.deep.eq(msg);
});
});
describe('Messaging', () => {
beforeEach(() => {
sinon.stub(server, 'getHandlerByPattern')
.callsFake(() => messageHandler);
});
it('should handleMessage correctly', async () => {
await server.listen(callback);
const payload: EachMessagePayload = {
message: msg,
partition: 1,
topic: 'test-topic'
};
await server.handleMessage(payload);
});
});
});