resolve conflicts

This commit is contained in:
Kamil Myśliwiec
2019-09-03 13:30:12 +02:00
55 changed files with 4049 additions and 136 deletions

View File

@@ -0,0 +1,173 @@
import {
ArgumentsHost,
Catch,
ExceptionFilter,
HttpException,
INestApplication,
RpcExceptionFilter,
} from '@nestjs/common';
import { RpcException, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as request from 'supertest';
import { KafkaController } from '../src/kafka/kafka.controller';
import { APP_FILTER } from '@nestjs/core';
import { Observable, throwError } from 'rxjs';
import { KafkaMessagesController } from '../src/kafka/kafka.messages.controller';
import { UserDto } from '../src/kafka/dtos/user.dto';
import { UserEntity } from '../src/kafka/entities/user.entity';
import { BusinessDto } from '../src/kafka/dtos/business.dto';
import { BusinessEntity } from '../src/kafka/entities/business.entity';
@Catch()
class KafkaExceptionFilter implements ExceptionFilter {
catch(exception: HttpException, host: ArgumentsHost): any {
console.log(exception);
}
}
@Catch()
class RpcErrorFilter implements RpcExceptionFilter {
catch(exception: RpcException): Observable<any> {
console.log(exception);
return throwError(exception);
}
}
describe('Kafka transport', () => {
let server;
let app: INestApplication;
it(`Start Kafka app`, async () => {
const module = await Test.createTestingModule({
controllers: [
KafkaController,
KafkaMessagesController,
],
providers: [
{
provide: APP_FILTER,
useClass: RpcErrorFilter,
},
{
provide: APP_FILTER,
useClass: KafkaExceptionFilter,
},
],
}).compile();
app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
});
await app.startAllMicroservicesAsync();
await app.init();
}).timeout(30000);
it(`/POST (sync sum kafka message)`, () => {
return request(server)
.post('/mathSumSyncKafkaMessage')
.send([1, 2, 3, 4, 5])
.expect(200)
.expect(200, '15');
});
it(`/POST (sync sum kafka(ish) message without key and only the value)`, () => {
return request(server)
.post('/mathSumSyncWithoutKey')
.send([1, 2, 3, 4, 5])
.expect(200)
.expect(200, '15');
});
it(`/POST (sync sum plain object)`, () => {
return request(server)
.post('/mathSumSyncPlainObject')
.send([1, 2, 3, 4, 5])
.expect(200)
.expect(200, '15');
});
it(`/POST (sync sum array)`, () => {
return request(server)
.post('/mathSumSyncArray')
.send([1, 2, 3, 4, 5])
.expect(200)
.expect(200, '15');
});
it(`/POST (sync sum string)`, () => {
return request(server)
.post('/mathSumSyncString')
.send([1, 2, 3, 4, 5])
.expect(200)
.expect(200, '15');
});
it(`/POST (sync sum number)`, () => {
return request(server)
.post('/mathSumSyncNumber')
.send([12345])
.expect(200)
.expect(200, '15');
});
it(`/POST (async event notification)`, done => {
request(server)
.post('/notify')
.send()
.end(() => {
setTimeout(() => {
expect(KafkaController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});
const userDto: UserDto = {
email: 'enriquebenavidesm@gmail.com',
name: 'Ben',
phone: '1112223331',
years: 33,
};
const newUser: UserEntity = new UserEntity(userDto);
const businessDto: BusinessDto = {
name: 'Example',
phone: '2233441122',
user: newUser,
};
it(`/POST (sync command create user)`, () => {
return request(server)
.post('/user')
.send(userDto)
.expect(200);
});
it(`/POST (sync command create business`, () => {
return request(server)
.post('/business')
.send(businessDto)
.expect(200);
});
it(`/POST (sync command create user) Concurrency Test`, async () => {
const promises = [];
for (let concurrencyKey = 0; concurrencyKey < 100; concurrencyKey++) {
const innerUserDto = JSON.parse(JSON.stringify(userDto));
innerUserDto.name += `+${concurrencyKey}`;
promises.push(request(server).post('/user').send(userDto).expect(200));
}
await Promise.all(promises);
});
after(`Stopping Kafka app`, async () => {
await app.close();
});
}).timeout(30000);

View File

@@ -0,0 +1,7 @@
import { UserEntity } from '../entities/user.entity';
export class BusinessDto {
name: string;
phone: string;
user: UserEntity;
}

View File

@@ -0,0 +1,6 @@
export class UserDto {
name: string;
email: string;
phone: string;
years: number;
}

View File

@@ -0,0 +1,19 @@
import { UserEntity } from './user.entity';
import { BusinessDto } from '../dtos/business.dto';
export class BusinessEntity {
constructor(business: BusinessDto) {
this.id = Math.random() * 99999999;
this.name = business.name;
this.phone = business.phone;
this.createdBy = {
id: business.user.id,
};
this.created = new Date();
}
id: number;
name: string;
phone: string;
createdBy: Partial<UserEntity>;
created: Date;
}

View File

@@ -0,0 +1,18 @@
import { UserDto } from '../dtos/user.dto';
export class UserEntity {
constructor(user: UserDto) {
this.id = Math.random() * 99999999;
this.name = user.name;
this.email = user.email;
this.phone = user.phone;
this.years = user.years;
this.created = new Date();
}
id: number;
name: string;
email: string;
phone: string;
years: number;
created: Date;
}

View File

@@ -0,0 +1,148 @@
import * as util from 'util';
import { Body, Controller, HttpCode, Post, OnModuleInit } from '@nestjs/common';
import {
Client,
Transport,
ClientKafka,
} from '@nestjs/microservices';
import { Logger } from '@nestjs/common/services/logger.service';
import { Observable } from 'rxjs';
import { UserDto } from './dtos/user.dto';
import { BusinessDto } from './dtos/business.dto';
@Controller()
export class KafkaController implements OnModuleInit {
protected readonly logger = new Logger(KafkaController.name);
static IS_NOTIFIED = false;
static MATH_SUM = 0;
@Client({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
})
private readonly client: ClientKafka;
onModuleInit(){
const requestPatterns = [
'math.sum.sync.kafka.message',
'math.sum.sync.without.key',
'math.sum.sync.plain.object',
'math.sum.sync.array',
'math.sum.sync.string',
'math.sum.sync.number',
'user.create',
'business.create',
];
requestPatterns.forEach((pattern) => {
this.client.subscribeToResponseOf(pattern);
});
}
// sync send kafka message
@Post('mathSumSyncKafkaMessage')
@HttpCode(200)
async mathSumSyncKafkaMessage(
@Body() data: number[],
): Promise<Observable<any>> {
const result = await this.client.send('math.sum.sync.kafka.message', {
key: '1',
value: {
numbers: data,
},
}).toPromise();
return result.value;
}
// sync send kafka(ish) message without key and only the value
@Post('mathSumSyncWithoutKey')
@HttpCode(200)
async mathSumSyncWithoutKey(
@Body() data: number[],
): Promise<Observable<any>> {
const result = await this.client.send('math.sum.sync.without.key', {
value: {
numbers: data,
},
}).toPromise();
return result.value;
}
// sync send message without key or value
@Post('mathSumSyncPlainObject')
@HttpCode(200)
async mathSumSyncPlainObject(
@Body() data: number[],
): Promise<Observable<any>> {
const result = await this.client.send('math.sum.sync.plain.object', {
numbers: data,
}).toPromise();
return result.value;
}
// sync send message without key or value
@Post('mathSumSyncArray')
@HttpCode(200)
async mathSumSyncArray(
@Body() data: number[],
): Promise<Observable<any>> {
const result = await this.client.send('math.sum.sync.array', data).toPromise();
return result.value;
}
@Post('mathSumSyncString')
@HttpCode(200)
async mathSumSyncString(
@Body() data: number[],
): Promise<Observable<any>> {
// this.logger.error(util.format('mathSumSyncString() data: %o', data));
const result = await this.client.send('math.sum.sync.string', data.toString()).toPromise();
return result.value;
}
@Post('mathSumSyncNumber')
@HttpCode(200)
async mathSumSyncNumber(
@Body() data: number[],
): Promise<Observable<any>> {
const result = await this.client.send('math.sum.sync.number', data[0]).toPromise();
return result.value;
}
// async notify
@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit('notify', {notify: true});
}
// Complex data to send.
@Post('/user')
@HttpCode(200)
async createUser(@Body() user: UserDto): Promise<Observable<any>> {
const result = await this.client.send('user.create', {
key: '1',
value: {
user,
},
}).toPromise();
return result.value;
}
// Complex data to send.
@Post('/business')
@HttpCode(200)
async createBusiness(@Body() business: BusinessDto) {
const result = await this.client.send('business.create', {
key: '1',
value: {
business,
},
}).toPromise();
return result.value;
}
}

View File

@@ -0,0 +1,70 @@
import { Controller } from '@nestjs/common';
import { Logger } from '@nestjs/common/services/logger.service';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
import { BusinessDto } from './dtos/business.dto';
import { UserDto } from './dtos/user.dto';
import { BusinessEntity } from './entities/business.entity';
import { UserEntity } from './entities/user.entity';
import { KafkaController } from './kafka.controller';
@Controller()
export class KafkaMessagesController {
protected readonly logger = new Logger(KafkaMessagesController.name);
static IS_NOTIFIED = false;
@MessagePattern('math.sum.sync.kafka.message')
mathSumSyncKafkaMessage(data: any) {
return (data.value.numbers || []).reduce((a, b) => a + b);
}
@MessagePattern('math.sum.sync.without.key')
mathSumSyncWithoutKey(data: any) {
return (data.value.numbers || []).reduce((a, b) => a + b);
}
@MessagePattern('math.sum.sync.plain.object')
mathSumSyncPlainObject(data: any) {
return (data.value.numbers || []).reduce((a, b) => a + b);
}
@MessagePattern('math.sum.sync.array')
mathSumSyncArray(data: any) {
return (data.value || []).reduce((a, b) => a + b);
}
@MessagePattern('math.sum.sync.string')
mathSumSyncString(data: any) {
// this.logger.error(util.format('mathSumSyncString() data: %o', data));
return (data.value.split(',') || [])
.map(i => {
return parseFloat(i);
})
.reduce((a, b) => a + b);
}
@MessagePattern('math.sum.sync.number')
mathSumSyncNumber(data: any) {
// this.logger.error(util.format('mathSumSyncNumber() data: %o', data));
return (data.value.toString().split('') || [])
.map(i => {
return parseFloat(i);
})
.reduce((a, b) => a + b);
}
@EventPattern('notify')
eventHandler(data: any) {
KafkaController.IS_NOTIFIED = data.value.notify;
}
// Complex data to send.
@MessagePattern('user.create')
async createUser(params: { value: { user: UserDto } }) {
return new UserEntity(params.value.user);
}
@MessagePattern('business.create')
async createBusiness(params: { value: { business: BusinessDto } }) {
return new BusinessEntity(params.value.business);
}
}