import { Logger } from '@nestjs/common/services/logger.service'; import { isNil } from '@nestjs/common/utils/shared.utils'; import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs'; import { KAFKA_DEFAULT_BROKER, KAFKA_DEFAULT_CLIENT, KAFKA_DEFAULT_GROUP, NO_EVENT_HANDLER, NO_MESSAGE_HANDLER, } from '../constants'; import { KafkaContext } from '../ctx-host'; import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer'; import { KafkaHeaders, Transport } from '../enums'; import { KafkaStatus } from '../events'; import { KafkaRetriableException } from '../exceptions'; import { BrokersFunction, Consumer, ConsumerConfig, EachMessagePayload, Kafka, KafkaConfig, KafkaMessage, Message, Producer, RecordMetadata, } from '../external/kafka.interface'; import { KafkaLogger, KafkaParser } from '../helpers'; import { KafkaOptions, OutgoingResponse, ReadPacket } from '../interfaces'; import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer'; import { Server } from './server'; let kafkaPackage: any = {}; /** * @publicApi */ export class ServerKafka extends Server { public readonly transportId = Transport.KAFKA; protected logger = new Logger(ServerKafka.name); protected client: Kafka | null = null; protected consumer: Consumer | null = null; protected producer: Producer | null = null; protected parser: KafkaParser | null = null; protected brokers: string[] | BrokersFunction; protected clientId: string; protected groupId: string; constructor(protected readonly options: Required['options']) { super(); const clientOptions = this.getOptionsProp( this.options, 'client', {} as KafkaConfig, ); const consumerOptions = this.getOptionsProp( this.options, 'consumer', {} as ConsumerConfig, ); const postfixId = this.getOptionsProp(this.options, 'postfixId', '-server'); this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER]; // Append a unique id to the clientId and groupId // so they don't collide with a microservices client this.clientId = (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId; this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId; kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'), ); this.parser = new KafkaParser((options && options.parser) || undefined); this.initializeSerializer(options); this.initializeDeserializer(options); } public async listen( callback: (err?: unknown, ...optionalParams: unknown[]) => void, ): Promise { try { this.client = this.createClient(); await this.start(callback); } catch (err) { callback(err); } } public async close(): Promise { this.consumer && (await this.consumer.disconnect()); this.producer && (await this.producer.disconnect()); this.consumer = null; this.producer = null; this.client = null; } public async start(callback: () => void): Promise { const consumerOptions = Object.assign(this.options.consumer || {}, { groupId: this.groupId, }); this.consumer = this.client!.consumer(consumerOptions); this.producer = this.client!.producer(this.options.producer); this.registerConsumerEventListeners(); this.registerProducerEventListeners(); await this.consumer.connect(); await this.producer.connect(); await this.bindEvents(this.consumer); callback(); } protected registerConsumerEventListeners() { if (!this.consumer) { return; } this.consumer.on(this.consumer.events.CONNECT, () => this._status$.next(KafkaStatus.CONNECTED), ); this.consumer.on(this.consumer.events.DISCONNECT, () => this._status$.next(KafkaStatus.DISCONNECTED), ); this.consumer.on(this.consumer.events.REBALANCING, () => this._status$.next(KafkaStatus.REBALANCING), ); this.consumer.on(this.consumer.events.STOP, () => this._status$.next(KafkaStatus.STOPPED), ); this.consumer.on(this.consumer.events.CRASH, () => this._status$.next(KafkaStatus.CRASHED), ); } protected registerProducerEventListeners() { if (!this.producer) { return; } this.producer.on(this.producer.events.CONNECT, () => this._status$.next(KafkaStatus.CONNECTED), ); this.producer.on(this.producer.events.DISCONNECT, () => this._status$.next(KafkaStatus.DISCONNECTED), ); } public createClient(): T { return new kafkaPackage.Kafka( Object.assign( { logCreator: KafkaLogger.bind(null, this.logger) }, this.options.client, { clientId: this.clientId, brokers: this.brokers }, ) as KafkaConfig, ); } public async bindEvents(consumer: Consumer) { const registeredPatterns = [...this.messageHandlers.keys()]; const consumerSubscribeOptions = this.options.subscribe || {}; if (registeredPatterns.length > 0) { await this.consumer!.subscribe({ ...consumerSubscribeOptions, topics: registeredPatterns, }); } const consumerRunOptions = Object.assign(this.options.run || {}, { eachMessage: this.getMessageHandler(), }); await consumer.run(consumerRunOptions); } public getMessageHandler() { return async (payload: EachMessagePayload) => this.handleMessage(payload); } public getPublisher( replyTopic: string, replyPartition: string, correlationId: string, ): (data: any) => Promise { return (data: any) => this.sendMessage(data, replyTopic, replyPartition, correlationId); } public async handleMessage(payload: EachMessagePayload) { const channel = payload.topic; const rawMessage = this.parser!.parse( Object.assign(payload.message, { topic: payload.topic, partition: payload.partition, }), ); const headers = rawMessage.headers as unknown as Record; const correlationId = headers[KafkaHeaders.CORRELATION_ID]; const replyTopic = headers[KafkaHeaders.REPLY_TOPIC]; const replyPartition = headers[KafkaHeaders.REPLY_PARTITION]; const packet = await this.deserializer.deserialize(rawMessage, { channel }); const kafkaContext = new KafkaContext([ rawMessage, payload.partition, payload.topic, this.consumer!, payload.heartbeat, this.producer!, ]); const handler = this.getHandlerByPattern(packet.pattern); // if the correlation id or reply topic is not set // then this is an event (events could still have correlation id) if (handler?.isEventHandler || !correlationId || !replyTopic) { return this.handleEvent(packet.pattern, packet, kafkaContext); } const publish = this.getPublisher( replyTopic, replyPartition, correlationId, ); if (!handler) { return publish({ id: correlationId, err: NO_MESSAGE_HANDLER, }); } const response$ = this.transformToObservable( handler(packet.data, kafkaContext), ); const replayStream$ = new ReplaySubject(); await this.combineStreamsAndThrowIfRetriable(response$, replayStream$); this.send(replayStream$, publish); } public unwrap(): T { if (!this.client) { throw new Error( 'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.', ); } return this.client as T; } public on< EventKey extends string | number | symbol = string | number | symbol, EventCallback = any, >(event: EventKey, callback: EventCallback) { throw new Error('Method is not supported for Kafka server'); } private combineStreamsAndThrowIfRetriable( response$: Observable, replayStream$: ReplaySubject, ) { return new Promise((resolve, reject) => { let isPromiseResolved = false; response$.subscribe({ next: val => { replayStream$.next(val); if (!isPromiseResolved) { isPromiseResolved = true; resolve(); } }, error: err => { if (err instanceof KafkaRetriableException && !isPromiseResolved) { isPromiseResolved = true; reject(err); } else { resolve(); } replayStream$.error(err); }, complete: () => replayStream$.complete(), }); }); } public async sendMessage( message: OutgoingResponse, replyTopic: string, replyPartition: string, correlationId: string, ): Promise { const outgoingMessage = await this.serializer.serialize(message.response); this.assignReplyPartition(replyPartition, outgoingMessage); this.assignCorrelationIdHeader(correlationId, outgoingMessage); this.assignErrorHeader(message, outgoingMessage); this.assignIsDisposedHeader(message, outgoingMessage); const replyMessage = Object.assign( { topic: replyTopic, messages: [outgoingMessage], }, this.options.send || {}, ); return this.producer!.send(replyMessage); } public assignIsDisposedHeader( outgoingResponse: OutgoingResponse, outgoingMessage: Message, ) { if (!outgoingResponse.isDisposed) { return; } outgoingMessage.headers![KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1); } public assignErrorHeader( outgoingResponse: OutgoingResponse, outgoingMessage: Message, ) { if (!outgoingResponse.err) { return; } const stringifiedError = typeof outgoingResponse.err === 'object' ? JSON.stringify(outgoingResponse.err) : outgoingResponse.err; outgoingMessage.headers![KafkaHeaders.NEST_ERR] = Buffer.from(stringifiedError); } public assignCorrelationIdHeader( correlationId: string, outgoingMessage: Message, ) { outgoingMessage.headers![KafkaHeaders.CORRELATION_ID] = Buffer.from(correlationId); } public assignReplyPartition( replyPartition: string, outgoingMessage: Message, ) { if (isNil(replyPartition)) { return; } outgoingMessage.partition = parseFloat(replyPartition); } public async handleEvent( pattern: string, packet: ReadPacket, context: KafkaContext, ): Promise { const handler = this.getHandlerByPattern(pattern); if (!handler) { return this.logger.error(NO_EVENT_HANDLER`${pattern}`); } const resultOrStream = await handler(packet.data, context); if (isObservable(resultOrStream)) { await lastValueFrom(resultOrStream); } } protected initializeSerializer(options: KafkaOptions['options']) { this.serializer = (options && options.serializer) || new KafkaRequestSerializer(); } protected initializeDeserializer(options: KafkaOptions['options']) { this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer(); } }