bugfix(ws/microservices): add catchError() to observables

This commit is contained in:
Kamil Myśliwiec
2018-12-06 11:47:14 +01:00
parent c6ec8878e0
commit efb367e65c
8 changed files with 177 additions and 22 deletions

View File

@@ -73,6 +73,15 @@ describe('NATS transport', () => {
.expect(200, 'true'); .expect(200, 'true');
}); });
it(`/GET (exception)`, () => {
return request(server)
.get('/exception')
.expect(200, {
message: 'test',
status: 'error',
});
});
afterEach(async () => { afterEach(async () => {
await app.close(); await app.close();
}); });

View File

@@ -1,12 +1,13 @@
import { Body, Controller, HttpCode, Post, Query } from '@nestjs/common'; import { Body, Controller, Get, HttpCode, Post, Query } from '@nestjs/common';
import { import {
Client, Client,
ClientProxy, ClientProxy,
MessagePattern, MessagePattern,
RpcException,
Transport, Transport,
} from '@nestjs/microservices'; } from '@nestjs/microservices';
import { from, Observable, of } from 'rxjs'; import { from, Observable, of, throwError } from 'rxjs';
import { scan } from 'rxjs/operators'; import { catchError, scan } from 'rxjs/operators';
@Controller() @Controller()
export class NatsController { export class NatsController {
@@ -71,4 +72,16 @@ export class NatsController {
streaming(data: number[]): Observable<number> { streaming(data: number[]): Observable<number> {
return from(data); return from(data);
} }
@Get('exception')
async getError() {
return await this.client
.send<number>('exception', {})
.pipe(catchError(err => of(err)));
}
@MessagePattern('exception')
throwError(): Observable<number> {
return throwError(new RpcException('test'));
}
} }

View File

@@ -0,0 +1,35 @@
import { INestApplication } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as io from 'socket.io-client';
import { ErrorGateway } from '../src/error.gateway';
describe('ErrorGateway', () => {
let app: INestApplication;
beforeEach(async () => {
const testingModule = await Test.createTestingModule({
providers: [ErrorGateway],
}).compile();
app = await testingModule.createNestApplication();
await app.listenAsync(3000);
});
it(`should handle error`, async () => {
const ws = io.connect('http://localhost:8080');
ws.emit('push', {
test: 'test',
});
await new Promise(resolve =>
ws.on('exception', data => {
expect(data).to.be.eql({
status: 'error',
message: 'test',
});
resolve();
}),
);
});
afterEach(() => app.close());
});

View File

@@ -0,0 +1,14 @@
import {
SubscribeMessage,
WebSocketGateway,
WsException,
} from '@nestjs/websockets';
import { throwError } from 'rxjs';
@WebSocketGateway(8080)
export class ErrorGateway {
@SubscribeMessage('push')
onPush(client, data) {
return throwError(new WsException('test'));
}
}

View File

@@ -1,5 +1,7 @@
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context.host'; import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context.host';
import { Observable } from 'rxjs'; import { Observable } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler'; import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler';
export class RpcProxy { export class RpcProxy {
@@ -9,11 +11,30 @@ export class RpcProxy {
): (...args) => Promise<Observable<any>> { ): (...args) => Promise<Observable<any>> {
return async (...args) => { return async (...args) => {
try { try {
return await targetCallback(...args); const result = await targetCallback(...args);
} catch (e) { return !this.isObservable(result)
const host = new ExecutionContextHost(args); ? result
return exceptionsHandler.handle(e, host); : result.pipe(
catchError(error =>
this.handleError(exceptionsHandler, args, error),
),
);
} catch (error) {
return this.handleError(exceptionsHandler, args, error);
} }
}; };
} }
handleError<T>(
exceptionsHandler: RpcExceptionsHandler,
args: any[],
error: T,
): Observable<any> {
const host = new ExecutionContextHost(args);
return exceptionsHandler.handle(error, host);
}
isObservable(result: any): boolean {
return result && isFunction(result.subscribe);
}
} }

View File

@@ -1,9 +1,9 @@
import * as sinon from 'sinon';
import { expect } from 'chai'; import { expect } from 'chai';
import { of, throwError } from 'rxjs';
import * as sinon from 'sinon';
import { RpcProxy } from '../../context/rpc-proxy'; import { RpcProxy } from '../../context/rpc-proxy';
import { RpcExceptionsHandler } from '../../exceptions/rpc-exceptions-handler';
import { RpcException } from '../../exceptions/rpc-exception'; import { RpcException } from '../../exceptions/rpc-exception';
import { Observable, of } from 'rxjs'; import { RpcExceptionsHandler } from '../../exceptions/rpc-exceptions-handler';
describe('RpcProxy', () => { describe('RpcProxy', () => {
let routerProxy: RpcProxy; let routerProxy: RpcProxy;
@@ -18,10 +18,7 @@ describe('RpcProxy', () => {
describe('create', () => { describe('create', () => {
it('should method return thunk', async () => { it('should method return thunk', async () => {
const proxy = await routerProxy.create( const proxy = await routerProxy.create(async data => of(true), handler);
async data => of(true),
handler,
);
expect(typeof proxy === 'function').to.be.true; expect(typeof proxy === 'function').to.be.true;
}); });
@@ -33,5 +30,26 @@ describe('RpcProxy', () => {
await proxy(null); await proxy(null);
expectation.verify(); expectation.verify();
}); });
it('should attach "catchError" operator when observable was returned', async () => {
const expectation = handlerMock.expects('handle').once();
const proxy = routerProxy.create(async (client, data) => {
return throwError(new RpcException('test'));
}, handler);
(await proxy(null, null)).subscribe(null, () => expectation.verify());
});
});
describe('isObservable', () => {
describe('when observable', () => {
it('should return true', () => {
expect(routerProxy.isObservable(of('test'))).to.be.true;
});
});
describe('when not observable', () => {
it('should return false', () => {
expect(routerProxy.isObservable({})).to.be.false;
});
});
}); });
}); });

View File

@@ -1,18 +1,41 @@
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context.host'; import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context.host';
import { empty } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { WsExceptionsHandler } from '../exceptions/ws-exceptions-handler'; import { WsExceptionsHandler } from '../exceptions/ws-exceptions-handler';
export class WsProxy { export class WsProxy {
public create( public create(
targetCallback: (...args) => Promise<void>, targetCallback: (...args) => Promise<any>,
exceptionsHandler: WsExceptionsHandler, exceptionsHandler: WsExceptionsHandler,
): (...args) => Promise<void> { ): (...args) => Promise<any> {
return async (...args) => { return async (...args) => {
try { try {
return await targetCallback(...args); const result = await targetCallback(...args);
} catch (e) { return !this.isObservable(result)
const host = new ExecutionContextHost(args); ? result
exceptionsHandler.handle(e, host); : result.pipe(
catchError(error => {
this.handleError(exceptionsHandler, args, error);
return empty();
}),
);
} catch (error) {
this.handleError(exceptionsHandler, args, error);
} }
}; };
} }
handleError<T>(
exceptionsHandler: WsExceptionsHandler,
args: any[],
error: T,
) {
const host = new ExecutionContextHost(args);
exceptionsHandler.handle(error, host);
}
isObservable(result: any): boolean {
return result && isFunction(result.subscribe);
}
} }

View File

@@ -1,8 +1,9 @@
import * as sinon from 'sinon';
import { expect } from 'chai'; import { expect } from 'chai';
import { of, throwError } from 'rxjs';
import * as sinon from 'sinon';
import { WsProxy } from '../../context/ws-proxy'; import { WsProxy } from '../../context/ws-proxy';
import { WsExceptionsHandler } from '../../exceptions/ws-exceptions-handler';
import { WsException } from '../../exceptions/ws-exception'; import { WsException } from '../../exceptions/ws-exception';
import { WsExceptionsHandler } from '../../exceptions/ws-exceptions-handler';
describe('WsProxy', () => { describe('WsProxy', () => {
let routerProxy: WsProxy; let routerProxy: WsProxy;
@@ -30,5 +31,26 @@ describe('WsProxy', () => {
await proxy(null, null); await proxy(null, null);
expectation.verify(); expectation.verify();
}); });
it('should attach "catchError" operator when observable was returned', async () => {
const expectation = handlerMock.expects('handle').once();
const proxy = routerProxy.create(async (client, data) => {
return throwError(new WsException('test'));
}, handler);
(await proxy(null, null)).subscribe(null, () => expectation.verify());
});
});
describe('isObservable', () => {
describe('when observable', () => {
it('should return true', () => {
expect(routerProxy.isObservable(of('test'))).to.be.true;
});
});
describe('when not observable', () => {
it('should return false', () => {
expect(routerProxy.isObservable({})).to.be.false;
});
});
}); });
}); });