KafkaParser options to keep binary payloads without utf8 encoding

This commit is contained in:
Ivan Alduan
2021-04-09 12:47:28 +02:00
parent b29714cb66
commit df911082aa
5 changed files with 28 additions and 12 deletions

8
package-lock.json generated
View File

@@ -6270,7 +6270,6 @@
"resolved": "https://registry.npmjs.org/apollo-env/-/apollo-env-0.6.6.tgz",
"integrity": "sha512-hXI9PjJtzmD34XviBU+4sPMOxnifYrHVmxpjykqI/dUD2G3yTiuRaiQqwRwB2RCdwC1Ug/jBfoQ/NHDTnnjndQ==",
"dev": true,
"optional": true,
"requires": {
"@types/node-fetch": "2.5.7",
"core-js": "^3.0.1",
@@ -11482,8 +11481,7 @@
"version": "0.0.10",
"resolved": "https://registry.npmjs.org/cssfilter/-/cssfilter-0.0.10.tgz",
"integrity": "sha1-xtJnJjKi5cg+AT5oZKQs6N79IK4=",
"dev": true,
"optional": true
"dev": true
},
"csv-parse": {
"version": "4.15.3",
@@ -27111,7 +27109,6 @@
"resolved": "https://registry.npmjs.org/xss/-/xss-1.0.8.tgz",
"integrity": "sha512-3MgPdaXV8rfQ/pNn16Eio6VXYPTkqwa0vc7GkiymmY/DqR1SE/7VPAAVZz1GJsJFrllMYO3RHfEaiUGjab6TNw==",
"dev": true,
"optional": true,
"requires": {
"commander": "^2.20.3",
"cssfilter": "0.0.10"
@@ -27121,8 +27118,7 @@
"version": "2.20.3",
"resolved": "https://registry.npmjs.org/commander/-/commander-2.20.3.tgz",
"integrity": "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ==",
"dev": true,
"optional": true
"dev": true
}
}
},

View File

@@ -40,10 +40,11 @@ import { ClientProxy } from './client-proxy';
let kafkaPackage: any = {};
export class ClientKafka extends ClientProxy {
protected logger = new Logger(ClientKafka.name);
protected client: Kafka = null;
protected consumer: Consumer = null;
protected producer: Producer = null;
protected logger = new Logger(ClientKafka.name);
protected parser: KafkaParser = null;
protected responsePatterns: string[] = [];
protected consumerAssignments: { [key: string]: number } = {};
@@ -73,6 +74,8 @@ export class ClientKafka extends ClientProxy {
require('kafkajs'),
);
this.parser = new KafkaParser((options && options.parser) || undefined);
this.initializeSerializer(options);
this.initializeDeserializer(options);
}
@@ -153,7 +156,7 @@ export class ClientKafka extends ClientProxy {
public createResponseCallback(): (payload: EachMessagePayload) => any {
return (payload: EachMessagePayload) => {
const rawMessage = KafkaParser.parse<KafkaMessage>(
const rawMessage = this.parser.parse<KafkaMessage>(
Object.assign(payload.message, {
topic: payload.topic,
partition: payload.partition,

View File

@@ -1,8 +1,17 @@
import { isNil } from '@nestjs/common/utils/shared.utils';
import { KafkaParserConfig } from '../interfaces';
export class KafkaParser {
public static parse<T = any>(data: any): T {
protected readonly keepBinary: boolean;
constructor(config?: KafkaParserConfig) {
this.keepBinary = (config && config.keepBinary) || false;
}
public parse<T = any>(data: any): T {
if (!this.keepBinary) {
data.value = this.decode(data.value);
}
if (!isNil(data.key)) {
data.key = this.decode(data.key);
@@ -18,7 +27,7 @@ export class KafkaParser {
return data;
}
public static decode(value: Buffer): object | string | null {
public decode(value: Buffer): object | string | null {
if (isNil(value)) {
return null;
}

View File

@@ -157,6 +157,10 @@ export interface RmqOptions {
};
}
export interface KafkaParserConfig {
keepBinary?: boolean;
}
export interface KafkaOptions {
transport?: Transport.KAFKA;
options?: {
@@ -169,5 +173,6 @@ export interface KafkaOptions {
send?: Omit<ProducerRecord, 'topic' | 'messages'>;
serializer?: Serializer;
deserializer?: Deserializer;
parser?: KafkaParserConfig;
};
}

View File

@@ -39,6 +39,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
protected client: Kafka = null;
protected consumer: Consumer = null;
protected producer: Producer = null;
protected parser: KafkaParser = null;
protected brokers: string[] | BrokersFunction;
protected clientId: string;
@@ -66,6 +67,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
require('kafkajs'),
);
this.parser = new KafkaParser((options && options.parser) || undefined);
this.initializeSerializer(options);
this.initializeDeserializer(options);
}
@@ -137,7 +140,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
public async handleMessage(payload: EachMessagePayload) {
const channel = payload.topic;
const rawMessage = KafkaParser.parse<KafkaMessage>(
const rawMessage = this.parser.parse<KafkaMessage>(
Object.assign(payload.message, {
topic: payload.topic,
partition: payload.partition,