mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
222 lines
6.2 KiB
TypeScript
222 lines
6.2 KiB
TypeScript
import { Logger, LoggerService } from '@nestjs/common/services/logger.service';
|
|
import { loadPackage } from '@nestjs/common/utils/load-package.util';
|
|
import {
|
|
connectable,
|
|
EMPTY,
|
|
from as fromPromise,
|
|
isObservable,
|
|
Observable,
|
|
ObservedValueOf,
|
|
of,
|
|
Subject,
|
|
Subscription,
|
|
} from 'rxjs';
|
|
import { catchError, finalize, mergeMap } from 'rxjs/operators';
|
|
import { NO_EVENT_HANDLER } from '../constants';
|
|
import { BaseRpcContext } from '../ctx-host/base-rpc.context';
|
|
import { IncomingRequestDeserializer } from '../deserializers/incoming-request.deserializer';
|
|
import {
|
|
ClientOptions,
|
|
KafkaOptions,
|
|
MessageHandler,
|
|
MicroserviceOptions,
|
|
MqttOptions,
|
|
MsPattern,
|
|
NatsOptions,
|
|
ReadPacket,
|
|
RedisOptions,
|
|
RmqOptions,
|
|
TcpOptions,
|
|
WritePacket,
|
|
} from '../interfaces';
|
|
import { ConsumerDeserializer } from '../interfaces/deserializer.interface';
|
|
import { ConsumerSerializer } from '../interfaces/serializer.interface';
|
|
import { IdentitySerializer } from '../serializers/identity.serializer';
|
|
import { transformPatternToRoute } from '../utils';
|
|
|
|
/**
|
|
* @publicApi
|
|
*/
|
|
export abstract class Server {
|
|
protected readonly messageHandlers = new Map<string, MessageHandler>();
|
|
protected readonly logger: LoggerService = new Logger(Server.name);
|
|
protected serializer: ConsumerSerializer;
|
|
protected deserializer: ConsumerDeserializer;
|
|
|
|
public addHandler(
|
|
pattern: any,
|
|
callback: MessageHandler,
|
|
isEventHandler = false,
|
|
extras: Record<string, any> = {},
|
|
) {
|
|
const normalizedPattern = this.normalizePattern(pattern);
|
|
callback.isEventHandler = isEventHandler;
|
|
callback.extras = extras;
|
|
|
|
if (this.messageHandlers.has(normalizedPattern) && isEventHandler) {
|
|
const headRef = this.messageHandlers.get(normalizedPattern);
|
|
const getTail = (handler: MessageHandler) =>
|
|
handler?.next ? getTail(handler.next) : handler;
|
|
|
|
const tailRef = getTail(headRef);
|
|
tailRef.next = callback;
|
|
} else {
|
|
this.messageHandlers.set(normalizedPattern, callback);
|
|
}
|
|
}
|
|
|
|
public getHandlers(): Map<string, MessageHandler> {
|
|
return this.messageHandlers;
|
|
}
|
|
|
|
public getHandlerByPattern(pattern: string): MessageHandler | null {
|
|
const route = this.getRouteFromPattern(pattern);
|
|
return this.messageHandlers.has(route)
|
|
? this.messageHandlers.get(route)
|
|
: null;
|
|
}
|
|
|
|
public send(
|
|
stream$: Observable<any>,
|
|
respond: (data: WritePacket) => unknown | Promise<unknown>,
|
|
): Subscription {
|
|
let dataBuffer: WritePacket[] = null;
|
|
const scheduleOnNextTick = (data: WritePacket) => {
|
|
if (!dataBuffer) {
|
|
dataBuffer = [data];
|
|
process.nextTick(async () => {
|
|
for (const item of dataBuffer) {
|
|
await respond(item);
|
|
}
|
|
dataBuffer = null;
|
|
});
|
|
} else if (!data.isDisposed) {
|
|
dataBuffer = dataBuffer.concat(data);
|
|
} else {
|
|
dataBuffer[dataBuffer.length - 1].isDisposed = data.isDisposed;
|
|
}
|
|
};
|
|
return stream$
|
|
.pipe(
|
|
catchError((err: any) => {
|
|
scheduleOnNextTick({ err });
|
|
return EMPTY;
|
|
}),
|
|
finalize(() => scheduleOnNextTick({ isDisposed: true })),
|
|
)
|
|
.subscribe((response: any) => scheduleOnNextTick({ response }));
|
|
}
|
|
|
|
public async handleEvent(
|
|
pattern: string,
|
|
packet: ReadPacket,
|
|
context: BaseRpcContext,
|
|
): Promise<any> {
|
|
const handler = this.getHandlerByPattern(pattern);
|
|
if (!handler) {
|
|
return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
|
|
}
|
|
const resultOrStream = await handler(packet.data, context);
|
|
if (isObservable(resultOrStream)) {
|
|
const connectableSource = connectable(resultOrStream, {
|
|
connector: () => new Subject(),
|
|
resetOnDisconnect: false,
|
|
});
|
|
connectableSource.connect();
|
|
}
|
|
}
|
|
|
|
public transformToObservable<T>(
|
|
resultOrDeferred: Observable<T> | Promise<T>,
|
|
): Observable<T>;
|
|
public transformToObservable<T>(
|
|
resultOrDeferred: T,
|
|
): never extends Observable<ObservedValueOf<T>>
|
|
? Observable<T>
|
|
: Observable<ObservedValueOf<T>>;
|
|
public transformToObservable(resultOrDeferred: any) {
|
|
if (resultOrDeferred instanceof Promise) {
|
|
return fromPromise(resultOrDeferred).pipe(
|
|
mergeMap(val => (isObservable(val) ? val : of(val))),
|
|
);
|
|
}
|
|
|
|
if (isObservable(resultOrDeferred)) {
|
|
return resultOrDeferred;
|
|
}
|
|
|
|
return of(resultOrDeferred);
|
|
}
|
|
|
|
public getOptionsProp<
|
|
T extends MicroserviceOptions['options'],
|
|
K extends keyof T,
|
|
>(obj: T, prop: K, defaultValue: T[K] = undefined) {
|
|
return obj && prop in obj ? obj[prop] : defaultValue;
|
|
}
|
|
|
|
protected handleError(error: string) {
|
|
this.logger.error(error);
|
|
}
|
|
|
|
protected loadPackage<T = any>(
|
|
name: string,
|
|
ctx: string,
|
|
loader?: Function,
|
|
): T {
|
|
return loadPackage(name, ctx, loader);
|
|
}
|
|
|
|
protected initializeSerializer(options: ClientOptions['options']) {
|
|
this.serializer =
|
|
(options &&
|
|
(
|
|
options as
|
|
| RedisOptions['options']
|
|
| NatsOptions['options']
|
|
| MqttOptions['options']
|
|
| TcpOptions['options']
|
|
| RmqOptions['options']
|
|
| KafkaOptions['options']
|
|
).serializer) ||
|
|
new IdentitySerializer();
|
|
}
|
|
|
|
protected initializeDeserializer(options: ClientOptions['options']) {
|
|
this.deserializer =
|
|
(options &&
|
|
(
|
|
options as
|
|
| RedisOptions['options']
|
|
| NatsOptions['options']
|
|
| MqttOptions['options']
|
|
| TcpOptions['options']
|
|
| RmqOptions['options']
|
|
| KafkaOptions['options']
|
|
).deserializer) ||
|
|
new IncomingRequestDeserializer();
|
|
}
|
|
|
|
/**
|
|
* Transforms the server Pattern to valid type and returns a route for him.
|
|
*
|
|
* @param {string} pattern - server pattern
|
|
* @returns string
|
|
*/
|
|
protected getRouteFromPattern(pattern: string): string {
|
|
let validPattern: MsPattern;
|
|
|
|
try {
|
|
validPattern = JSON.parse(pattern);
|
|
} catch (error) {
|
|
// Uses a fundamental object (`pattern` variable without any conversion)
|
|
validPattern = pattern;
|
|
}
|
|
return this.normalizePattern(validPattern);
|
|
}
|
|
|
|
protected normalizePattern(pattern: MsPattern): string {
|
|
return transformPatternToRoute(pattern);
|
|
}
|
|
}
|