feat(microservices): add status, unwrap, on, and other features

This commit is contained in:
Kamil Myśliwiec
2024-11-15 10:03:05 +01:00
parent bc4667c15a
commit 3dfc7fc68e
62 changed files with 2114 additions and 852 deletions

View File

@@ -11,6 +11,7 @@ import {
import { KafkaContext } from '../ctx-host';
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
import { KafkaHeaders, Transport } from '../enums';
import { KafkaStatus } from '../events';
import { KafkaRetriableException } from '../exceptions';
import {
BrokersFunction,
@@ -25,12 +26,7 @@ import {
RecordMetadata,
} from '../external/kafka.interface';
import { KafkaLogger, KafkaParser } from '../helpers';
import {
CustomTransportStrategy,
KafkaOptions,
OutgoingResponse,
ReadPacket,
} from '../interfaces';
import { KafkaOptions, OutgoingResponse, ReadPacket } from '../interfaces';
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
import { Server } from './server';
@@ -39,7 +35,7 @@ let kafkaPackage: any = {};
/**
* @publicApi
*/
export class ServerKafka extends Server implements CustomTransportStrategy {
export class ServerKafka extends Server<never, KafkaStatus> {
public readonly transportId = Transport.KAFKA;
protected logger = new Logger(ServerKafka.name);
@@ -47,7 +43,6 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
protected consumer: Consumer = null;
protected producer: Producer = null;
protected parser: KafkaParser = null;
protected brokers: string[] | BrokersFunction;
protected clientId: string;
protected groupId: string;
@@ -64,7 +59,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
// append a unique id to the clientId and groupId
// Append a unique id to the clientId and groupId
// so they don't collide with a microservices client
this.clientId =
(clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
@@ -105,6 +100,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
});
this.consumer = this.client.consumer(consumerOptions);
this.producer = this.client.producer(this.options.producer);
this.registerConsumerEventListeners();
this.registerProducerEventListeners();
await this.consumer.connect();
await this.producer.connect();
@@ -112,6 +109,33 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
callback();
}
protected registerConsumerEventListeners() {
this.consumer.on(this.consumer.events.CONNECT, () =>
this._status$.next(KafkaStatus.CONNECTED),
);
this.consumer.on(this.consumer.events.DISCONNECT, () =>
this._status$.next(KafkaStatus.DISCONNECTED),
);
this.consumer.on(this.consumer.events.REBALANCING, () =>
this._status$.next(KafkaStatus.REBALANCING),
);
this.consumer.on(this.consumer.events.STOP, () =>
this._status$.next(KafkaStatus.STOPPED),
);
this.consumer.on(this.consumer.events.CRASH, () =>
this._status$.next(KafkaStatus.CRASHED),
);
}
protected registerProducerEventListeners() {
this.producer.on(this.producer.events.CONNECT, () =>
this._status$.next(KafkaStatus.CONNECTED),
);
this.producer.on(this.producer.events.DISCONNECT, () =>
this._status$.next(KafkaStatus.DISCONNECTED),
);
}
public createClient<T = any>(): T {
return new kafkaPackage.Kafka(
Object.assign(
@@ -204,6 +228,22 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
this.send(replayStream$, publish);
}
public unwrap<T>(): T {
if (!this.client) {
throw new Error(
'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
);
}
return this.client as T;
}
public on<
EventKey extends string | number | symbol = string | number | symbol,
EventCallback = any,
>(event: EventKey, callback: EventCallback) {
throw new Error('Method is not supported for Kafka server');
}
private combineStreamsAndThrowIfRetriable(
response$: Observable<any>,
replayStream$: ReplaySubject<unknown>,