mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
Merge pull request #8851 from micalevisk/fix-issue-8595
Use `isObervable` from `rxjs` instead of our own implementation. Drop few deprecated APIs from 3rd-party libs. And other minor refactorings
This commit is contained in:
@@ -134,8 +134,8 @@ export class AdvancedGrpcController {
|
||||
async streamReq(messages: Observable<any>): Promise<any> {
|
||||
const s = new Subject();
|
||||
const o = s.asObservable();
|
||||
messages.subscribe(
|
||||
msg => {
|
||||
messages.subscribe({
|
||||
next: () => {
|
||||
s.next({
|
||||
id: 1,
|
||||
itemTypes: [1],
|
||||
@@ -146,9 +146,8 @@ export class AdvancedGrpcController {
|
||||
},
|
||||
});
|
||||
},
|
||||
null,
|
||||
() => s.complete(),
|
||||
);
|
||||
complete: () => s.complete(),
|
||||
});
|
||||
return o;
|
||||
}
|
||||
|
||||
|
||||
@@ -50,16 +50,16 @@ export class GrpcController {
|
||||
@GrpcStreamMethod('Math')
|
||||
async sumStream(messages: Observable<any>): Promise<any> {
|
||||
return new Promise<any>((resolve, reject) => {
|
||||
messages.subscribe(
|
||||
msg => {
|
||||
messages.subscribe({
|
||||
next: msg => {
|
||||
resolve({
|
||||
result: msg.data.reduce((a, b) => a + b),
|
||||
});
|
||||
},
|
||||
err => {
|
||||
error: err => {
|
||||
reject(err);
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ import {
|
||||
ArgumentMetadata,
|
||||
PipeTransform,
|
||||
} from '../interfaces/features/pipe-transform.interface';
|
||||
import { isNil } from '../utils/shared.utils';
|
||||
import { isNil, isNumber } from '../utils/shared.utils';
|
||||
|
||||
/**
|
||||
* Defines the built-in DefaultValue Pipe
|
||||
@@ -21,7 +21,7 @@ export class DefaultValuePipe<T = any, R = any>
|
||||
transform(value?: T, _metadata?: ArgumentMetadata): T | R {
|
||||
if (
|
||||
isNil(value) ||
|
||||
(typeof value === 'number' && isNaN(value as unknown as number))
|
||||
(isNumber(value) && isNaN(value as unknown as number))
|
||||
) {
|
||||
return this.defaultValue;
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
isEmpty,
|
||||
isFunction,
|
||||
isNil,
|
||||
isNumber,
|
||||
isObject,
|
||||
isPlainObject,
|
||||
isString,
|
||||
@@ -66,15 +67,33 @@ describe('Shared utils', () => {
|
||||
});
|
||||
});
|
||||
describe('isString', () => {
|
||||
it('should return true when obj is a string', () => {
|
||||
it('should return true when val is a string', () => {
|
||||
expect(isString('true')).to.be.true;
|
||||
});
|
||||
it('should return false when object is not a string', () => {
|
||||
it('should return false when val is not a string', () => {
|
||||
expect(isString(new String('fine'))).to.be.false;
|
||||
expect(isString(false)).to.be.false;
|
||||
expect(isString(null)).to.be.false;
|
||||
expect(isString(undefined)).to.be.false;
|
||||
});
|
||||
});
|
||||
describe('isNumber', () => {
|
||||
it('should return true when val is a number or NaN', () => {
|
||||
expect(isNumber(1)).to.be.true;
|
||||
expect(isNumber(1.23)).to.be.true; // with decimals
|
||||
expect(isNumber(123e-5)).to.be.true; // scientific (exponent) notation
|
||||
expect(isNumber(0o1)).to.be.true; // octal notation
|
||||
expect(isNumber(0b1)).to.be.true; // binary notation
|
||||
expect(isNumber(0x1)).to.be.true; // hexadecimal notation
|
||||
expect(isNumber(NaN)).to.be.true;
|
||||
});
|
||||
it('should return false when val is not a number', () => {
|
||||
// expect(isNumber(1n)).to.be.false; // big int (available on ES2020)
|
||||
expect(isNumber('1')).to.be.false; // string
|
||||
expect(isNumber(undefined)).to.be.false; // nullish
|
||||
expect(isNumber(null)).to.be.false; // nullish
|
||||
});
|
||||
});
|
||||
describe('isConstructor', () => {
|
||||
it('should return true when string is equal to constructor', () => {
|
||||
expect(isConstructor('constructor')).to.be.true;
|
||||
|
||||
@@ -47,10 +47,11 @@ export const normalizePath = (path?: string): string =>
|
||||
export const stripEndSlash = (path: string) =>
|
||||
path[path.length - 1] === '/' ? path.slice(0, path.length - 1) : path;
|
||||
|
||||
export const isFunction = (fn: any): boolean => typeof fn === 'function';
|
||||
export const isString = (fn: any): fn is string => typeof fn === 'string';
|
||||
export const isConstructor = (fn: any): boolean => fn === 'constructor';
|
||||
export const isNil = (obj: any): obj is null | undefined =>
|
||||
isUndefined(obj) || obj === null;
|
||||
export const isFunction = (val: any): boolean => typeof val === 'function';
|
||||
export const isString = (val: any): val is string => typeof val === 'string';
|
||||
export const isNumber = (val: any): val is number => typeof val === 'number';
|
||||
export const isConstructor = (val: any): boolean => val === 'constructor';
|
||||
export const isNil = (val: any): val is null | undefined =>
|
||||
isUndefined(val) || val === null;
|
||||
export const isEmpty = (array: any): boolean => !(array && array.length > 0);
|
||||
export const isSymbol = (fn: any): fn is symbol => typeof fn === 'symbol';
|
||||
export const isSymbol = (val: any): val is symbol => typeof val === 'symbol';
|
||||
|
||||
@@ -6,7 +6,7 @@ import {
|
||||
PipeTransform,
|
||||
} from '@nestjs/common/interfaces';
|
||||
import { isEmpty, isFunction } from '@nestjs/common/utils/shared.utils';
|
||||
import { lastValueFrom } from 'rxjs';
|
||||
import { lastValueFrom, isObservable } from 'rxjs';
|
||||
import { ExternalExceptionFilterContext } from '../exceptions/external-exception-filter-context';
|
||||
import { FORBIDDEN_MESSAGE } from '../guards/constants';
|
||||
import { GuardsConsumer } from '../guards/guards-consumer';
|
||||
@@ -329,7 +329,7 @@ export class ExternalContextCreator {
|
||||
}
|
||||
|
||||
public async transformToResult(resultOrDeferred: any) {
|
||||
if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) {
|
||||
if (isObservable(resultOrDeferred)) {
|
||||
return lastValueFrom(resultOrDeferred);
|
||||
}
|
||||
return resultOrDeferred;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { isFunction } from '@nestjs/common/utils/shared.utils';
|
||||
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
|
||||
import { Observable } from 'rxjs';
|
||||
import { Observable, isObservable } from 'rxjs';
|
||||
import { catchError } from 'rxjs/operators';
|
||||
import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler';
|
||||
|
||||
@@ -12,7 +11,7 @@ export class RpcProxy {
|
||||
return async (...args: unknown[]) => {
|
||||
try {
|
||||
const result = await targetCallback(...args);
|
||||
return !this.isObservable(result)
|
||||
return !isObservable(result)
|
||||
? result
|
||||
: result.pipe(
|
||||
catchError(error =>
|
||||
@@ -34,8 +33,4 @@ export class RpcProxy {
|
||||
host.setType('rpc');
|
||||
return exceptionsHandler.handle(error, host);
|
||||
}
|
||||
|
||||
isObservable(result: any): boolean {
|
||||
return result && isFunction(result.subscribe);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ export class BaseRpcExceptionFilter<T = any, R = any>
|
||||
}
|
||||
const res = exception.getError();
|
||||
const message = isObject(res) ? res : { status, message: res };
|
||||
return _throw(message);
|
||||
return _throw(() => message);
|
||||
}
|
||||
|
||||
public handleUnknownError(exception: T, status: string) {
|
||||
@@ -29,7 +29,7 @@ export class BaseRpcExceptionFilter<T = any, R = any>
|
||||
const logger = BaseRpcExceptionFilter.logger;
|
||||
logger.error.apply(logger, loggerArgs as any);
|
||||
|
||||
return _throw({ status, message: errorMessage });
|
||||
return _throw(() => ({ status, message: errorMessage }));
|
||||
}
|
||||
|
||||
public isError(exception: any): exception is Error {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { loadPackage } from '@nestjs/common/utils/load-package.util';
|
||||
import { isObject } from '@nestjs/common/utils/shared.utils';
|
||||
import { NatsCodec } from '../external/nats-client.interface';
|
||||
import { ReadPacket } from '../interfaces';
|
||||
import { Serializer } from '../interfaces/serializer.interface';
|
||||
@@ -20,9 +21,7 @@ export class NatsRecordSerializer
|
||||
|
||||
serialize(packet: ReadPacket | any): NatsRecord {
|
||||
const natsMessage =
|
||||
packet?.data &&
|
||||
typeof packet.data === 'object' &&
|
||||
packet.data instanceof NatsRecord
|
||||
packet?.data && isObject(packet.data) && packet.data instanceof NatsRecord
|
||||
? (packet.data as NatsRecord)
|
||||
: new NatsRecordBuilder(packet?.data).build();
|
||||
|
||||
|
||||
@@ -220,10 +220,10 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
|
||||
public createUnaryServiceMethod(methodHandler: Function): Function {
|
||||
return async (call: GrpcCall, callback: Function) => {
|
||||
const handler = methodHandler(call.request, call.metadata, call);
|
||||
this.transformToObservable(await handler).subscribe(
|
||||
data => callback(null, data),
|
||||
(err: any) => callback(err),
|
||||
);
|
||||
this.transformToObservable(await handler).subscribe({
|
||||
next: data => callback(null, data),
|
||||
error: (err: any) => callback(err),
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -184,7 +184,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
|
||||
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data, kafkaContext),
|
||||
) as Observable<any>;
|
||||
);
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data, mqttContext),
|
||||
) as Observable<any>;
|
||||
);
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
@@ -191,7 +191,8 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
|
||||
for (const [key, value] of this.messageHandlers) {
|
||||
if (
|
||||
!key.includes(MQTT_WILDCARD_SINGLE) && !key.includes(MQTT_WILDCARD_ALL)
|
||||
!key.includes(MQTT_WILDCARD_SINGLE) &&
|
||||
!key.includes(MQTT_WILDCARD_ALL)
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ export class ServerNats extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(message.data, natsCtx),
|
||||
) as Observable<any>;
|
||||
);
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
|
||||
@@ -121,7 +121,7 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data, redisCtx),
|
||||
) as Observable<any>;
|
||||
);
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
|
||||
@@ -157,7 +157,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data, rmqContext),
|
||||
) as Observable<any>;
|
||||
);
|
||||
|
||||
const publish = <T>(data: T) =>
|
||||
this.sendMessage(data, properties.replyTo, properties.correlationId);
|
||||
|
||||
@@ -90,7 +90,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data, tcpContext),
|
||||
) as Observable<any>;
|
||||
);
|
||||
|
||||
response$ &&
|
||||
this.send(response$, data => {
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { Logger, LoggerService } from '@nestjs/common/services/logger.service';
|
||||
import { loadPackage } from '@nestjs/common/utils/load-package.util';
|
||||
import { isFunction } from '@nestjs/common/utils/shared.utils';
|
||||
import {
|
||||
connectable,
|
||||
EMPTY as empty,
|
||||
EMPTY,
|
||||
from as fromPromise,
|
||||
isObservable,
|
||||
Observable,
|
||||
ObservedValueOf,
|
||||
of,
|
||||
Subject,
|
||||
Subscription,
|
||||
@@ -94,7 +95,7 @@ export abstract class Server {
|
||||
.pipe(
|
||||
catchError((err: any) => {
|
||||
scheduleOnNextTick({ err });
|
||||
return empty;
|
||||
return EMPTY;
|
||||
}),
|
||||
finalize(() => scheduleOnNextTick({ isDisposed: true })),
|
||||
)
|
||||
@@ -113,7 +114,7 @@ export abstract class Server {
|
||||
);
|
||||
}
|
||||
const resultOrStream = await handler(packet.data, context);
|
||||
if (this.isObservable(resultOrStream)) {
|
||||
if (isObservable(resultOrStream)) {
|
||||
const connectableSource = connectable(resultOrStream, {
|
||||
connector: () => new Subject(),
|
||||
resetOnDisconnect: false,
|
||||
@@ -122,13 +123,24 @@ export abstract class Server {
|
||||
}
|
||||
}
|
||||
|
||||
public transformToObservable<T = any>(resultOrDeferred: any): Observable<T> {
|
||||
public transformToObservable<T>(
|
||||
resultOrDeferred: Observable<T> | Promise<T>,
|
||||
): Observable<T>;
|
||||
public transformToObservable<T>(
|
||||
resultOrDeferred: T,
|
||||
): never extends Observable<ObservedValueOf<T>>
|
||||
? Observable<T>
|
||||
: Observable<ObservedValueOf<T>>;
|
||||
public transformToObservable(resultOrDeferred: any) {
|
||||
if (resultOrDeferred instanceof Promise) {
|
||||
return fromPromise(resultOrDeferred);
|
||||
} else if (!this.isObservable(resultOrDeferred)) {
|
||||
return of(resultOrDeferred);
|
||||
}
|
||||
return resultOrDeferred;
|
||||
|
||||
if (isObservable(resultOrDeferred)) {
|
||||
return resultOrDeferred;
|
||||
}
|
||||
|
||||
return of(resultOrDeferred);
|
||||
}
|
||||
|
||||
public getOptionsProp<
|
||||
@@ -180,10 +192,6 @@ export abstract class Server {
|
||||
new IncomingRequestDeserializer();
|
||||
}
|
||||
|
||||
private isObservable(input: unknown): input is Observable<any> {
|
||||
return input && isFunction((input as Observable<any>).subscribe);
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms the server Pattern to valid type and returns a route for him.
|
||||
*
|
||||
|
||||
@@ -129,10 +129,10 @@ describe('ClientGrpcProxy', () => {
|
||||
|
||||
it('should call native method', () => {
|
||||
const spy = sinon.spy(obj, methodName);
|
||||
stream$.subscribe(
|
||||
() => ({}),
|
||||
() => ({}),
|
||||
);
|
||||
stream$.subscribe({
|
||||
next: () => ({}),
|
||||
error: () => ({}),
|
||||
});
|
||||
|
||||
expect(spy.called).to.be.true;
|
||||
});
|
||||
@@ -156,10 +156,10 @@ describe('ClientGrpcProxy', () => {
|
||||
|
||||
it('should subscribe to request upstream', () => {
|
||||
const upstreamSubscribe = sinon.spy(upstream, 'subscribe');
|
||||
stream$.subscribe(
|
||||
() => ({}),
|
||||
() => ({}),
|
||||
);
|
||||
stream$.subscribe({
|
||||
next: () => ({}),
|
||||
error: () => ({}),
|
||||
});
|
||||
upstream.next({ test: true });
|
||||
|
||||
expect(writeSpy.called).to.be.true;
|
||||
@@ -201,7 +201,12 @@ describe('ClientGrpcProxy', () => {
|
||||
|
||||
it('propagates server errors', () => {
|
||||
const err = new Error('something happened');
|
||||
stream$.subscribe(dataSpy, errorSpy, completeSpy);
|
||||
stream$.subscribe({
|
||||
next: dataSpy,
|
||||
error: errorSpy,
|
||||
complete: completeSpy,
|
||||
});
|
||||
|
||||
eventCallbacks.data('a');
|
||||
eventCallbacks.data('b');
|
||||
callMock.finished = true;
|
||||
@@ -219,7 +224,11 @@ describe('ClientGrpcProxy', () => {
|
||||
const grpcServerCancelErrMock = {
|
||||
details: 'Cancelled',
|
||||
};
|
||||
const subscription = stream$.subscribe(dataSpy, errorSpy);
|
||||
const subscription = stream$.subscribe({
|
||||
next: dataSpy,
|
||||
error: errorSpy,
|
||||
});
|
||||
|
||||
eventCallbacks.data('a');
|
||||
eventCallbacks.data('b');
|
||||
subscription.unsubscribe();
|
||||
@@ -258,10 +267,10 @@ describe('ClientGrpcProxy', () => {
|
||||
|
||||
it('should call native method', () => {
|
||||
const spy = sinon.spy(obj, methodName);
|
||||
stream$.subscribe(
|
||||
() => ({}),
|
||||
() => ({}),
|
||||
);
|
||||
stream$.subscribe({
|
||||
next: () => ({}),
|
||||
error: () => ({}),
|
||||
});
|
||||
|
||||
expect(spy.called).to.be.true;
|
||||
});
|
||||
@@ -298,10 +307,10 @@ describe('ClientGrpcProxy', () => {
|
||||
|
||||
it('should subscribe to request upstream', () => {
|
||||
const upstreamSubscribe = sinon.spy(upstream, 'subscribe');
|
||||
stream$.subscribe(
|
||||
() => ({}),
|
||||
() => ({}),
|
||||
);
|
||||
stream$.subscribe({
|
||||
next: () => ({}),
|
||||
error: () => ({}),
|
||||
});
|
||||
upstream.next({ test: true });
|
||||
|
||||
expect(writeSpy.called).to.be.true;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { expect } from 'chai';
|
||||
import { empty } from 'rxjs';
|
||||
import { EMPTY } from 'rxjs';
|
||||
import * as sinon from 'sinon';
|
||||
import { ClientMqtt } from '../../client/client-mqtt';
|
||||
import { ERROR_EVENT } from '../../constants';
|
||||
@@ -311,9 +311,9 @@ describe('ClientMqtt', () => {
|
||||
on: (ev, callback) => callback(error),
|
||||
off: () => ({}),
|
||||
};
|
||||
client
|
||||
.mergeCloseEvent(instance as any, empty())
|
||||
.subscribe(null, (err: any) => expect(err).to.be.eql(error));
|
||||
client.mergeCloseEvent(instance as any, EMPTY).subscribe({
|
||||
error: (err: any) => expect(err).to.be.eql(error),
|
||||
});
|
||||
});
|
||||
});
|
||||
describe('handleError', () => {
|
||||
|
||||
@@ -96,12 +96,12 @@ describe('ClientProxy', function () {
|
||||
throw new Error();
|
||||
});
|
||||
const stream$ = client.send({ test: 3 }, 'test');
|
||||
stream$.subscribe(
|
||||
() => {},
|
||||
err => {
|
||||
stream$.subscribe({
|
||||
next: () => {},
|
||||
error: err => {
|
||||
expect(err).to.be.instanceof(Error);
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
describe('when is connected', () => {
|
||||
@@ -142,12 +142,12 @@ describe('ClientProxy', function () {
|
||||
throw new Error();
|
||||
});
|
||||
const stream$ = client.emit({ test: 3 }, 'test');
|
||||
stream$.subscribe(
|
||||
() => {},
|
||||
err => {
|
||||
stream$.subscribe({
|
||||
next: () => {},
|
||||
error: err => {
|
||||
expect(err).to.be.instanceof(Error);
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
describe('when is connected', () => {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { expect } from 'chai';
|
||||
import { EventEmitter } from 'events';
|
||||
import { empty } from 'rxjs';
|
||||
import { EMPTY } from 'rxjs';
|
||||
import * as sinon from 'sinon';
|
||||
import { ClientRMQ } from '../../client/client-rmq';
|
||||
import { ReadPacket } from '../../interfaces';
|
||||
@@ -164,8 +164,8 @@ describe('ClientRMQ', function () {
|
||||
off: () => ({}),
|
||||
};
|
||||
client
|
||||
.mergeDisconnectEvent(instance as any, empty())
|
||||
.subscribe(null, (err: any) => expect(err).to.be.eql(error));
|
||||
.mergeDisconnectEvent(instance as any, EMPTY)
|
||||
.subscribe({ error: (err: any) => expect(err).to.be.eql(error) });
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -36,19 +36,8 @@ describe('RpcProxy', () => {
|
||||
const proxy = routerProxy.create(async (client, data) => {
|
||||
return throwError(() => new RpcException('test'));
|
||||
}, handler);
|
||||
(await proxy(null, null)).subscribe(null, () => expectation.verify());
|
||||
});
|
||||
});
|
||||
|
||||
describe('isObservable', () => {
|
||||
describe('when observable', () => {
|
||||
it('should return true', () => {
|
||||
expect(routerProxy.isObservable(of('test'))).to.be.true;
|
||||
});
|
||||
});
|
||||
describe('when not observable', () => {
|
||||
it('should return false', () => {
|
||||
expect(routerProxy.isObservable({})).to.be.false;
|
||||
(await proxy(null, null)).subscribe({
|
||||
error: () => expectation.verify(),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { expect } from 'chai';
|
||||
import { EMPTY as empty, of } from 'rxjs';
|
||||
import { EMPTY, of } from 'rxjs';
|
||||
import { catchError } from 'rxjs/operators';
|
||||
import * as sinon from 'sinon';
|
||||
import { RpcException } from '../../exceptions/rpc-exception';
|
||||
@@ -23,7 +23,7 @@ describe('RpcExceptionsHandler', () => {
|
||||
message: 'Internal server error',
|
||||
});
|
||||
done();
|
||||
return empty;
|
||||
return EMPTY;
|
||||
}),
|
||||
)
|
||||
.subscribe(() => ({}));
|
||||
@@ -39,7 +39,7 @@ describe('RpcExceptionsHandler', () => {
|
||||
catchError((err: any) => {
|
||||
expect(err).to.be.eql(message);
|
||||
done();
|
||||
return empty;
|
||||
return EMPTY;
|
||||
}),
|
||||
)
|
||||
.subscribe(() => ({}));
|
||||
@@ -53,7 +53,7 @@ describe('RpcExceptionsHandler', () => {
|
||||
catchError((err: any) => {
|
||||
expect(err).to.be.eql({ message, status: 'error' });
|
||||
done();
|
||||
return empty;
|
||||
return EMPTY;
|
||||
}),
|
||||
)
|
||||
.subscribe(() => ({}));
|
||||
|
||||
@@ -112,7 +112,7 @@ describe('ServerMqtt', () => {
|
||||
const handleEventSpy = sinon.spy(server, 'handleEvent');
|
||||
await server.handleMessage(
|
||||
channel,
|
||||
new Buffer(JSON.stringify({ pattern: '', data })),
|
||||
Buffer.from(JSON.stringify({ pattern: '', data })),
|
||||
null,
|
||||
);
|
||||
expect(handleEventSpy.called).to.be.true;
|
||||
@@ -120,7 +120,7 @@ describe('ServerMqtt', () => {
|
||||
it(`should publish NO_MESSAGE_HANDLER if pattern not exists in messageHandlers object`, async () => {
|
||||
await server.handleMessage(
|
||||
channel,
|
||||
new Buffer(JSON.stringify({ id, pattern: '', data })),
|
||||
Buffer.from(JSON.stringify({ id, pattern: '', data })),
|
||||
null,
|
||||
);
|
||||
expect(
|
||||
@@ -139,7 +139,7 @@ describe('ServerMqtt', () => {
|
||||
|
||||
await server.handleMessage(
|
||||
channel,
|
||||
new Buffer(JSON.stringify({ pattern: '', data, id: '2' })),
|
||||
Buffer.from(JSON.stringify({ pattern: '', data, id: '2' })),
|
||||
null,
|
||||
);
|
||||
expect(handler.calledWith(data)).to.be.true;
|
||||
|
||||
@@ -116,7 +116,7 @@ describe('Server', () => {
|
||||
});
|
||||
describe('throws exception', () => {
|
||||
beforeEach(() => {
|
||||
server.send(_throw('test') as any, sendSpy);
|
||||
server.send(_throw(() => 'test') as any, sendSpy);
|
||||
});
|
||||
it('should send error and complete', () => {
|
||||
process.nextTick(() => {
|
||||
@@ -149,7 +149,7 @@ describe('Server', () => {
|
||||
describe('transformToObservable', () => {
|
||||
describe('when resultOrDeferred', () => {
|
||||
describe('is Promise', () => {
|
||||
it('should return Observable', async () => {
|
||||
it('should return Observable that emits the resolved value of the supplied promise', async () => {
|
||||
const value = 100;
|
||||
expect(
|
||||
await lastValueFrom(
|
||||
@@ -159,21 +159,29 @@ describe('Server', () => {
|
||||
});
|
||||
});
|
||||
describe('is Observable', () => {
|
||||
it('should return Observable', async () => {
|
||||
it('should return the observable itself', async () => {
|
||||
const value = 100;
|
||||
expect(
|
||||
await lastValueFrom(server.transformToObservable(of(value))),
|
||||
).to.be.eq(100);
|
||||
});
|
||||
});
|
||||
describe('is value', () => {
|
||||
it('should return Observable', async () => {
|
||||
describe('is any number', () => {
|
||||
it('should return Observable that emits the supplied number', async () => {
|
||||
const value = 100;
|
||||
expect(
|
||||
await lastValueFrom(server.transformToObservable(value)),
|
||||
).to.be.eq(100);
|
||||
});
|
||||
});
|
||||
describe('is an array', () => {
|
||||
it('should return Observable that emits the supplied array', async () => {
|
||||
const value = [1, 2, 3];
|
||||
expect(
|
||||
await lastValueFrom(server.transformToObservable(value)),
|
||||
).to.be.eq(value);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
import { isObject, isString } from '@nestjs/common/utils/shared.utils';
|
||||
import {
|
||||
isObject,
|
||||
isString,
|
||||
isNumber,
|
||||
} from '@nestjs/common/utils/shared.utils';
|
||||
import { MsPattern } from '../interfaces';
|
||||
|
||||
/**
|
||||
@@ -14,7 +18,7 @@ import { MsPattern } from '../interfaces';
|
||||
* @returns string
|
||||
*/
|
||||
export function transformPatternToRoute(pattern: MsPattern): string {
|
||||
if (isString(pattern) || typeof pattern === 'number') {
|
||||
if (isString(pattern) || isNumber(pattern)) {
|
||||
return `${pattern}`;
|
||||
}
|
||||
if (!isObject(pattern)) {
|
||||
|
||||
@@ -22,7 +22,10 @@ import {
|
||||
} from '@nestjs/common/utils/shared.utils';
|
||||
import { AbstractHttpAdapter } from '@nestjs/core/adapters/http-adapter';
|
||||
import { RouterMethodFactory } from '@nestjs/core/helpers/router-method-factory';
|
||||
import * as bodyParser from 'body-parser';
|
||||
import {
|
||||
json as bodyParserJson,
|
||||
urlencoded as bodyParserUrlencoded,
|
||||
} from 'body-parser';
|
||||
import * as cors from 'cors';
|
||||
import * as express from 'express';
|
||||
import * as http from 'http';
|
||||
@@ -162,8 +165,8 @@ export class ExpressAdapter extends AbstractHttpAdapter {
|
||||
|
||||
public registerParserMiddleware() {
|
||||
const parserMiddleware = {
|
||||
jsonParser: bodyParser.json(),
|
||||
urlencodedParser: bodyParser.urlencoded({ extended: true }),
|
||||
jsonParser: bodyParserJson(),
|
||||
urlencodedParser: bodyParserUrlencoded({ extended: true }),
|
||||
};
|
||||
Object.keys(parserMiddleware)
|
||||
.filter(parser => !this.isMiddlewareApplied(parser))
|
||||
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
} from '@nestjs/websockets/constants';
|
||||
import { MessageMappingProperties } from '@nestjs/websockets/gateway-metadata-explorer';
|
||||
import * as http from 'http';
|
||||
import { EMPTY as empty, fromEvent, Observable } from 'rxjs';
|
||||
import { EMPTY, fromEvent, Observable } from 'rxjs';
|
||||
import { filter, first, mergeMap, share, takeUntil } from 'rxjs/operators';
|
||||
|
||||
let wsPackage: any = {};
|
||||
@@ -133,7 +133,7 @@ export class WsAdapter extends AbstractWsAdapter {
|
||||
const { callback } = messageHandler;
|
||||
return transform(callback(message.data));
|
||||
} catch {
|
||||
return empty;
|
||||
return EMPTY;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { isFunction } from '@nestjs/common/utils/shared.utils';
|
||||
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
|
||||
import { empty } from 'rxjs';
|
||||
import { EMPTY, isObservable } from 'rxjs';
|
||||
import { catchError } from 'rxjs/operators';
|
||||
import { WsExceptionsHandler } from '../exceptions/ws-exceptions-handler';
|
||||
|
||||
@@ -12,12 +11,12 @@ export class WsProxy {
|
||||
return async (...args: unknown[]) => {
|
||||
try {
|
||||
const result = await targetCallback(...args);
|
||||
return !this.isObservable(result)
|
||||
return !isObservable(result)
|
||||
? result
|
||||
: result.pipe(
|
||||
catchError(error => {
|
||||
this.handleError(exceptionsHandler, args, error);
|
||||
return empty();
|
||||
return EMPTY;
|
||||
}),
|
||||
);
|
||||
} catch (error) {
|
||||
@@ -35,8 +34,4 @@ export class WsProxy {
|
||||
host.setType('ws');
|
||||
exceptionsHandler.handle(error, host);
|
||||
}
|
||||
|
||||
isObservable(result: any): boolean {
|
||||
return result && isFunction(result.subscribe);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { expect } from 'chai';
|
||||
import { of, throwError } from 'rxjs';
|
||||
import { throwError } from 'rxjs';
|
||||
import * as sinon from 'sinon';
|
||||
import { WsProxy } from '../../context/ws-proxy';
|
||||
import { WsException } from '../../errors/ws-exception';
|
||||
@@ -40,17 +40,4 @@ describe('WsProxy', () => {
|
||||
(await proxy(null, null)).subscribe(null, () => expectation.verify());
|
||||
});
|
||||
});
|
||||
|
||||
describe('isObservable', () => {
|
||||
describe('when observable', () => {
|
||||
it('should return true', () => {
|
||||
expect(routerProxy.isObservable(of('test'))).to.be.true;
|
||||
});
|
||||
});
|
||||
describe('when not observable', () => {
|
||||
it('should return false', () => {
|
||||
expect(routerProxy.isObservable({})).to.be.false;
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user