bugfix(): normalize TCP pattern (event)

This commit is contained in:
Kamil Myśliwiec
2019-03-26 11:28:19 +01:00
parent 6d9f15a2a8
commit d9a28f2253
4 changed files with 44 additions and 14 deletions

View File

@@ -1,7 +1,9 @@
import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as request from 'supertest';
import { AppController } from '../src/app.controller';
import { ApplicationModule } from '../src/app.module';
describe('RPC transport', () => {
@@ -76,6 +78,18 @@ describe('RPC transport', () => {
.expect(500);
});
it(`/POST (event notification)`, done => {
request(server)
.post('/notify')
.send([1, 2, 3, 4, 5])
.end(() => {
setTimeout(() => {
expect(AppController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});
afterEach(async () => {
await app.close();
});

View File

@@ -1,15 +1,18 @@
import { Controller, Post, Body, Query, HttpCode } from '@nestjs/common';
import { Body, Controller, HttpCode, Post, Query } from '@nestjs/common';
import {
Client,
MessagePattern,
ClientProxy,
EventPattern,
MessagePattern,
Transport,
} from '@nestjs/microservices';
import { Observable, of, from } from 'rxjs';
import { from, Observable, of } from 'rxjs';
import { scan } from 'rxjs/operators';
@Controller()
export class AppController {
static IS_NOTIFIED = false;
@Client({ transport: Transport.TCP })
client: ClientProxy;
@@ -62,4 +65,14 @@ export class AppController {
streaming(data: number[]): Observable<number> {
return from(data);
}
@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit<number>('notification', true);
}
@EventPattern('notification')
eventHandler(data: boolean) {
AppController.IS_NOTIFIED = data;
}
}

View File

@@ -52,10 +52,7 @@ export class ClientTCP extends ClientProxy {
share(),
);
this.socket.connect(
this.port,
this.host,
);
this.socket.connect(this.port, this.host);
this.connection = source$.toPromise();
return this.connection;
}
@@ -122,6 +119,9 @@ export class ClientTCP extends ClientProxy {
}
protected async dispatchEvent(packet: ReadPacket): Promise<any> {
return this.socket.sendMessage(packet);
return this.socket.sendMessage({
...packet,
pattern: this.normalizePattern(packet.pattern),
});
}
}

View File

@@ -119,16 +119,19 @@ describe('ClientProxy', () => {
});
describe('createObserver', () => {
let testClient: TestClientProxy;
beforeEach(() => {
testClient = new TestClientProxy();
});
it(`should return function`, () => {
expect(typeof testClient['createObserver'](null)).to.be.eql('function');
const testClientProxy = new TestClientProxy();
expect(typeof testClientProxy['createObserver']({} as any)).to.be.eql(
'function',
);
});
describe('returned function calls', () => {
let testClient: TestClientProxy;
beforeEach(() => {
testClient = new TestClientProxy();
});
it(`"error" when first parameter is not null or undefined`, () => {
const err = 'test';
const error = sinon.spy();