fix(microservices): pull kafka retriable error from all handlers, lazy throw

This commit is contained in:
Kamil Myśliwiec
2023-02-02 15:39:45 +01:00
parent 9eb3b8903a
commit 43d0c166d4
3 changed files with 71 additions and 36 deletions

View File

@@ -14,7 +14,15 @@ import { Module } from '@nestjs/core/injector/module';
import { GraphInspector } from '@nestjs/core/inspector/graph-inspector';
import { MetadataScanner } from '@nestjs/core/metadata-scanner';
import { REQUEST_CONTEXT_ID } from '@nestjs/core/router/request/request-constants';
import { connectable, Observable, Subject } from 'rxjs';
import {
forkJoin,
from as fromPromise,
isObservable,
mergeMap,
Observable,
ObservedValueOf,
of,
} from 'rxjs';
import { IClientProxyFactory } from './client/client-proxy-factory';
import { ClientsContainer } from './container';
import { ExceptionFiltersContext } from './context/exception-filters-context';
@@ -111,21 +119,7 @@ export class ListenersController {
defaultCallMetadata,
);
if (isEventHandler) {
const eventHandler: MessageHandler = (...args: unknown[]) => {
const originalArgs = args;
const [dataOrContextHost] = originalArgs;
if (dataOrContextHost instanceof RequestContextHost) {
args = args.slice(1, args.length);
}
const originalReturnValue = proxy(...args);
const returnedValueWrapper = eventHandler.next?.(
...(originalArgs as Parameters<MessageHandler>),
);
returnedValueWrapper?.then(returnedValue =>
this.connectIfStream(returnedValue as Observable<unknown>),
);
return originalReturnValue;
};
const eventHandler = this.createEventHandlerCallback(proxy);
return server.addHandler(
pattern,
eventHandler,
@@ -143,6 +137,7 @@ export class ListenersController {
moduleKey,
methodKey,
defaultCallMetadata,
isEventHandler,
);
server.addHandler(pattern, asyncHandler, isEventHandler, extras);
});
@@ -174,6 +169,31 @@ export class ListenersController {
);
}
public createEventHandlerCallback(
proxy: (...args: unknown[]) => Observable<any> | Promise<any>,
): MessageHandler {
const eventHandler: MessageHandler = async (...args: unknown[]) => {
const originalArgs = args;
const [dataOrContextHost] = originalArgs;
if (dataOrContextHost instanceof RequestContextHost) {
args = args.slice(1, args.length);
}
const originalReturnValue = proxy(...args);
if (eventHandler.next) {
const returnedValueWrapper = eventHandler.next(
...(originalArgs as Parameters<MessageHandler>),
);
return forkJoin({
current: this.transformToObservable(originalReturnValue),
next: this.transformToObservable(returnedValueWrapper),
});
}
return originalReturnValue;
};
return eventHandler;
}
public assignClientsToProperties(instance: Controller | Injectable) {
for (const {
property,
@@ -201,6 +221,7 @@ export class ListenersController {
moduleKey: string,
methodKey: string,
defaultCallMetadata: Record<string, any> = DEFAULT_CALLBACK_METADATA,
isEventHandler = false,
) {
const collection = moduleRef.controllers;
const { instance } = wrapper;
@@ -225,6 +246,7 @@ export class ListenersController {
contextId = this.getContextId(request, isTreeDurable);
dataOrContextHost = request;
}
const contextInstance = await this.injector.loadPerContext(
instance,
moduleRef,
@@ -240,14 +262,14 @@ export class ListenersController {
wrapper.id,
defaultCallMetadata,
);
const returnedValueWrapper = requestScopedHandler.next?.(
dataOrContextHost,
...args,
);
returnedValueWrapper?.then(returnedValue =>
this.connectIfStream(returnedValue as Observable<unknown>),
);
return proxy(...args);
if (isEventHandler) {
const eventHandler: (...args: unknown[]) => unknown =
this.createEventHandlerCallback(proxy);
return eventHandler(...args);
} else {
return proxy(...args);
}
} catch (err) {
let exceptionFilter = this.exceptionFiltersCache.get(
instance[methodKey],
@@ -287,14 +309,25 @@ export class ListenersController {
return contextId;
}
private connectIfStream(source: Observable<unknown>) {
if (!source) {
return;
public transformToObservable<T>(
resultOrDeferred: Observable<T> | Promise<T>,
): Observable<T>;
public transformToObservable<T>(
resultOrDeferred: T,
): never extends Observable<ObservedValueOf<T>>
? Observable<T>
: Observable<ObservedValueOf<T>>;
public transformToObservable(resultOrDeferred: any) {
if (resultOrDeferred instanceof Promise) {
return fromPromise(resultOrDeferred).pipe(
mergeMap(val => (isObservable(val) ? val : of(val))),
);
}
const connectableSource = connectable(source, {
connector: () => new Subject(),
resetOnDisconnect: false,
});
connectableSource.connect();
if (isObservable(resultOrDeferred)) {
return resultOrDeferred;
}
return of(resultOrDeferred);
}
}

View File

@@ -191,7 +191,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
}
const response$ = this.transformToObservable(
await handler(packet.data, kafkaContext),
handler(packet.data, kafkaContext),
);
const replayStream$ = new ReplaySubject();
@@ -219,7 +219,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
isPromiseResolved = true;
reject(err);
} else {
resolve()
resolve();
}
replayStream$.error(err);
},

View File

@@ -11,7 +11,7 @@ import {
Subject,
Subscription,
} from 'rxjs';
import { catchError, finalize } from 'rxjs/operators';
import { catchError, finalize, mergeMap } from 'rxjs/operators';
import { NO_EVENT_HANDLER } from '../constants';
import { BaseRpcContext } from '../ctx-host/base-rpc.context';
import { IncomingRequestDeserializer } from '../deserializers/incoming-request.deserializer';
@@ -133,7 +133,9 @@ export abstract class Server {
: Observable<ObservedValueOf<T>>;
public transformToObservable(resultOrDeferred: any) {
if (resultOrDeferred instanceof Promise) {
return fromPromise(resultOrDeferred);
return fromPromise(resultOrDeferred).pipe(
mergeMap(val => (isObservable(val) ? val : of(val))),
);
}
if (isObservable(resultOrDeferred)) {