chore(): resolve conflicts

This commit is contained in:
Kamil Myśliwiec
2021-06-29 14:32:00 +02:00
5 changed files with 40 additions and 14 deletions

View File

@@ -39,6 +39,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
protected client: Kafka = null;
protected consumer: Consumer = null;
protected producer: Producer = null;
protected parser: KafkaParser = null;
protected brokers: string[] | BrokersFunction;
protected clientId: string;
@@ -66,6 +67,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
require('kafkajs'),
);
this.parser = new KafkaParser((options && options.parser) || undefined);
this.initializeSerializer(options);
this.initializeDeserializer(options);
}
@@ -143,7 +146,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
public async handleMessage(payload: EachMessagePayload) {
const channel = payload.topic;
const rawMessage = KafkaParser.parse<KafkaMessage>(
const rawMessage = this.parser.parse<KafkaMessage>(
Object.assign(payload.message, {
topic: payload.topic,
partition: payload.partition,