Started work on the implementation on the kafka message pattern.

This commit is contained in:
Michael Kaufman
2019-08-15 13:39:03 -04:00
parent 32e07c3ede
commit 3e5a30d116
9 changed files with 187 additions and 24 deletions

View File

@@ -48,15 +48,40 @@ services:
zookeeper:
container_name: test-zookeeper
hostname: zookeeper
image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper:5.3.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
container_name: test-kafka
hostname: kafka
image: wurstmeister/kafka
image: confluentinc/cp-kafka:5.3.0
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# schema-registry:
# container_name: test-schema-registry
# hostname: schema-registry
# image: confluentinc/cp-schema-registry:5.3.0
# depends_on:
# - zookeeper
# - kafka
# ports:
# - "8081:8081"
# environment:
# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
# SCHEMA_REGISTRY_HOST_NAME: schema-registry
# SCHEMA_REGISTRY_LISTENERS: http://localhost:8081
# SCHEMA_REGISTRY_DEBUG: 'true'

View File

@@ -5,12 +5,15 @@ import {
EventPattern,
Transport,
} from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { Logger } from '@nestjs/common/services/logger.service';
import * as util from 'util';
import { Observable } from 'rxjs';
import * as uuid from 'uuid';
@Controller()
export class KafkaController implements OnModuleInit {
protected readonly logger = new Logger(KafkaController.name);
static IS_NOTIFIED = false;
static MATH_SUM = 0;
@@ -29,8 +32,8 @@ export class KafkaController implements OnModuleInit {
}
private parse(data: any) {
data.message.key = data.message.key.toString();
data.message.value = JSON.parse(data.message.value.toString());
data.key = data.key.toString();
data.value = JSON.parse(data.value.toString());
}
@Post()
@@ -39,17 +42,28 @@ export class KafkaController implements OnModuleInit {
@Query('command') cmd,
@Body() data: number[],
): Promise<Observable<any>> {
return this.client.emit('math.sum', data.map((num) => {
const key = uuid.v4(); // stick to a single partition
const result = await this.client.emit('math.sum', data.map((num) => {
return {
key: uuid.v4(), // stick to a single partition
key,
value: num.toString(),
headers: {
'correlation-id': key,
},
};
}));
})).toPromise();
this.logger.error(util.format('@Query math.sum result %o', result));
return result;
}
@EventPattern('math.sum')
mathSum(data: any){
KafkaController.MATH_SUM += parseFloat(data.message.value);
this.logger.error(util.format('@EventPattern math.sum data %o', data));
KafkaController.MATH_SUM += parseFloat(data.value);
}
@Post('notify')
@@ -66,6 +80,6 @@ export class KafkaController implements OnModuleInit {
eventHandler(data: any) {
this.parse(data);
KafkaController.IS_NOTIFIED = data.message.value.notify;
KafkaController.IS_NOTIFIED = data.value.notify;
}
}

View File

@@ -82,7 +82,7 @@ export interface PartitionMetadata {
}
export interface IHeaders {
[key: string]: string;
[key: string]: Buffer;
}
export interface ConsumerConfig {

View File

@@ -1,6 +1,5 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { EventEmitter } from 'events';
import { Observable } from 'rxjs';
import {
@@ -23,11 +22,7 @@ export class ClientKafka extends ClientProxy {
protected readonly logger = new Logger(ClientKafka.name);
private client: Kafka = null;
private producer: Producer = null;
protected channel: any = null;
protected urls: string[];
protected queue: string;
protected queueOptions: any;
protected responseEmitter: EventEmitter;
private readonly brokers: string[];
private readonly clientId: string;

View File

@@ -1 +1,2 @@
export * from './transport.enum';
export * from './kafka-headers.enum';

View File

@@ -0,0 +1,31 @@
export enum KafkaHeaders {
ACKNOWLEDGMENT = 'kafka_acknowledgment',
BATCH_CONVERTED_HEADERS = 'kafka_batchConvertedHeaders',
CONSUMER = 'kafka_consumer',
CORRELATION_ID = 'kafka_correlationId',
DLT_EXCEPTION_FQCN = 'kafka_dlt-exception-fqcn',
DLT_EXCEPTION_MESSAGE = 'kafka_dlt-exception-message',
DLT_EXCEPTION_STACKTRACE = 'kafka_dlt-exception-stacktrace',
DLT_ORIGINAL_OFFSET = 'kafka_dlt-original-offset',
DLT_ORIGINAL_PARTITION = 'kafka_dlt-original-partition',
DLT_ORIGINAL_TIMESTAMP = 'kafka_dlt-original-timestamp',
DLT_ORIGINAL_TIMESTAMP_TYPE = 'kafka_dlt-original-timestamp-type',
DLT_ORIGINAL_TOPIC = 'kafka_dlt-original-topic',
MESSAGE_KEY = 'kafka_messageKey',
NATIVE_HEADERS = 'kafka_nativeHeaders',
OFFSET = 'kafka_offset',
PARTITION_ID = 'kafka_partitionId',
PREFIX = 'kafka_',
RAW_DATA = 'kafka_data',
RECEIVED = 'kafka_received',
RECEIVED_MESSAGE_KEY = 'kafka_receivedMessageKey',
RECEIVED_PARTITION_ID = 'kafka_receivedPartitionId',
RECEIVED_TIMESTAMP = 'kafka_receivedTimestamp',
RECEIVED_TOPIC = 'kafka_receivedTopic',
RECORD_METADATA = 'kafka_recordMetadata',
REPLY_PARTITION = 'kafka_replyPartition',
REPLY_TOPIC = 'kafka_replyTopic',
TIMESTAMP = 'kafka_timestamp',
TIMESTAMP_TYPE = 'kafka_timestampType',
TOPIC = 'kafka_topic',
}

View File

@@ -82,7 +82,7 @@ export interface PartitionMetadata {
}
export interface IHeaders {
[key: string]: string;
[key: string]: Buffer;
}
export interface ConsumerConfig {

View File

@@ -7,4 +7,4 @@ export * from './microservice-configuration.interface';
export * from './packet.interface';
export * from './pattern-metadata.interface';
export * from './request-context.interface';
export * from './pattern.interface';
export * from './pattern.interface';

View File

@@ -1,3 +1,5 @@
import { isUndefined } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { Logger } from '@nestjs/common/services/logger.service';
import {
KAFKA_DEFAULT_BROKER,
@@ -8,11 +10,21 @@ import {
KafkaConfig,
Kafka,
Consumer,
Producer,
EachMessagePayload,
KafkaMessage,
Message,
logLevel
} from '../external/kafka.interface';
import { CustomTransportStrategy, KafkaOptions } from '../interfaces';
import { CustomTransportStrategy, KafkaOptions, ReadPacket, PacketId } from '../interfaces';
import { KafkaHeaders } from '../enums';
import { Server } from './server';
import { partition } from 'rxjs/operators';
interface KafkaPacket {
replyTopic?: string;
replyPartition?: number;
}
let kafkaPackage: any = {};
@@ -20,6 +32,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
protected readonly logger = new Logger(ServerKafka.name);
private client: Kafka = null;
private consumer: Consumer = null;
private producer: Producer = null;
private readonly brokers: string[];
private readonly clientId: string;
private readonly groupId: string;
@@ -40,16 +53,22 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
public close(): void {
this.consumer && this.consumer.disconnect();
this.producer && this.producer.disconnect();
this.consumer = null;
this.producer = null;
this.client = null;
}
public async start(callback: () => void): Promise<void> {
// create consumer and producer
this.consumer = this.client.consumer(Object.assign(this.options.consumer || {}, {
groupId: this.groupId
}));
this.producer = this.client.producer(this.options.producer);
await this.consumer.connect();
await this.producer.connect();
await this.bindEvents(this.consumer);
callback();
}
@@ -109,9 +128,87 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
public async handleMessage(
payload: EachMessagePayload
) {
return this.handleEvent(payload.topic, {
const packet = this.deserialize(payload);
const handler = this.getHandlerByPattern(packet.pattern);
if (handler.isEventHandler) {
return this.handleEvent(packet.pattern, packet);
}
// message handlers need at least a correlation id and a reply topic
if (isUndefined(packet.id) || isUndefined(packet.replyTopic)) {
throw new Error('Messaging not available');
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
const publish = <T>(data: T) =>
this.sendMessage(
data as T & Message,
packet.replyTopic,
packet.replyPartition,
packet.id
);
response$ && this.send(response$, publish);
}
private deserialize(payload: EachMessagePayload): KafkaPacket & ReadPacket<Message> & PacketId {
// build
const packet = {
id: undefined,
replyTopic: undefined,
replyPartition: undefined,
pattern: payload.topic,
data: payload
data: Object.assign(payload.message, {
topic: payload.topic,
partition: payload.partition
})
};
// parse the correlation id
if (!isUndefined(packet.data.headers[KafkaHeaders.CORRELATION_ID])) {
// assign the correlation id as the packet id
packet.id = packet.data.headers[KafkaHeaders.CORRELATION_ID].toString();
// parse the topic and partition
if (!isUndefined(packet.data.headers[KafkaHeaders.REPLY_TOPIC])) {
packet.replyTopic = packet.data.headers[KafkaHeaders.REPLY_TOPIC].toString();
}
if (!isUndefined(packet.data.headers[KafkaHeaders.REPLY_PARTITION])) {
packet.replyPartition = parseFloat(packet.data.headers[KafkaHeaders.REPLY_PARTITION].toString());
}
}
return packet;
}
public sendMessage<T = any>(
message: T & Message,
replyTopic: string,
replyPartition: number,
correlationId: string
): void {
// assign partition
message = Object.assign(message, {
partition: replyPartition || undefined
});
// create headers if they don't exist
if (isUndefined(message.headers)) {
message.headers = {};
}
// assign the correlation id
message.headers[KafkaHeaders.CORRELATION_ID] = Buffer.from(correlationId);
// send
this.producer.send(Object.assign({
topic: replyTopic,
messages: [message]
}, this.options.send || {}));
}
}