feat(microservices): initial implementation of message builders

This commit is contained in:
Kamil Myśliwiec
2021-08-25 11:10:07 +02:00
parent a225632199
commit ed1f0b8bc9
10 changed files with 161 additions and 6 deletions

View File

@@ -11,6 +11,7 @@ import {
} from '../constants';
import { MqttClient } from '../external/mqtt-client.interface';
import { MqttOptions, ReadPacket, WritePacket } from '../interfaces';
import { MqttRecord, MqttRecordOptions } from '../records/mqtt.record';
import { ClientProxy } from './client-proxy';
let mqttPackage: any = {};
@@ -122,6 +123,10 @@ export class ClientMqtt extends ClientProxy {
callback: (packet: WritePacket) => any,
): () => void {
try {
const recordOptions = this.unwrapRecord<MqttRecordOptions>(
partialPacket,
MqttRecord,
);
const packet = this.assignPacketId(partialPacket);
const pattern = this.normalizePattern(partialPacket.pattern);
const serializedPacket = this.serializer.serialize(packet);
@@ -133,9 +138,11 @@ export class ClientMqtt extends ClientProxy {
subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
this.subscriptionsCount.set(responseChannel, subscriptionsCount + 1);
this.routingMap.set(packet.id, callback);
this.mqttClient.publish(
this.getRequestPattern(pattern),
JSON.stringify(serializedPacket),
recordOptions,
);
};
@@ -158,12 +165,19 @@ export class ClientMqtt extends ClientProxy {
}
protected dispatchEvent(packet: ReadPacket): Promise<any> {
const recordOptions = this.unwrapRecord<MqttRecordOptions>(
packet,
MqttRecord,
);
const pattern = this.normalizePattern(packet.pattern);
const serializedPacket = this.serializer.serialize(packet);
return new Promise<void>((resolve, reject) =>
this.mqttClient.publish(pattern, JSON.stringify(serializedPacket), err =>
err ? reject(err) : resolve(),
this.mqttClient.publish(
pattern,
JSON.stringify(serializedPacket),
recordOptions,
(err: any) => (err ? reject(err) : resolve()),
),
);
}

View File

@@ -4,6 +4,7 @@ import { NATS_DEFAULT_URL } from '../constants';
import { NatsResponseJSONDeserializer } from '../deserializers/nats-response-json.deserializer';
import { Client, NatsMsg } from '../external/nats-client.interface';
import { NatsOptions, PacketId, ReadPacket, WritePacket } from '../interfaces';
import { NatsRecord, NatsRecordOptions } from '../records/nats.record';
import { NatsJSONSerializer } from '../serializers/nats-json.serializer';
import { ClientProxy } from './client-proxy';
@@ -99,6 +100,10 @@ export class ClientNats extends ClientProxy {
callback: (packet: WritePacket) => any,
): () => void {
try {
const recordOptions = this.unwrapRecord<NatsRecordOptions>(
partialPacket,
NatsRecord,
);
const packet = this.assignPacketId(partialPacket);
const channel = this.normalizePattern(partialPacket.pattern);
const serializedPacket = this.serializer.serialize(packet);
@@ -114,6 +119,7 @@ export class ClientNats extends ClientProxy {
});
this.natsClient.publish(channel, serializedPacket, {
...recordOptions,
reply: inbox,
});
@@ -124,12 +130,16 @@ export class ClientNats extends ClientProxy {
}
protected dispatchEvent(packet: ReadPacket): Promise<any> {
const recordOptions = this.unwrapRecord<NatsRecordOptions>(
packet,
NatsRecord,
);
const pattern = this.normalizePattern(packet.pattern);
const serializedPacket = this.serializer.serialize(packet);
return new Promise<void>((resolve, reject) => {
try {
this.natsClient.publish(pattern, serializedPacket);
this.natsClient.publish(pattern, serializedPacket, recordOptions);
resolve();
} catch (err) {
reject(err);

View File

@@ -1,3 +1,4 @@
import { Type } from '@nestjs/common';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { isNil } from '@nestjs/common/utils/shared.utils';
import {
@@ -29,6 +30,7 @@ import {
} from '../interfaces';
import { ProducerDeserializer } from '../interfaces/deserializer.interface';
import { ProducerSerializer } from '../interfaces/serializer.interface';
import { RecordWrapper } from '../records/record-wrapper';
import { IdentitySerializer } from '../serializers/identity.serializer';
import { transformPatternToRoute } from '../utils';
@@ -166,4 +168,19 @@ export abstract class ClientProxy {
).deserializer) ||
new IncomingResponseDeserializer();
}
protected unwrapRecord<
TOptions,
TProto extends Type<RecordWrapper> = Type<RecordWrapper>,
>(packet: ReadPacket, recordTypeRef: TProto): TOptions {
const originalDataRef = packet.data;
if (
typeof originalDataRef === 'object' &&
originalDataRef instanceof recordTypeRef
) {
packet.data = originalDataRef.data;
return originalDataRef.options;
}
return {} as TOptions;
}
}

View File

@@ -18,6 +18,7 @@ import {
} from '../constants';
import { RmqUrl } from '../external/rmq-url.interface';
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
import { RmqRecord, RmqRecordOptions } from '../records/rmq.record';
import { ClientProxy } from './client-proxy';
let rqmPackage: any = {};
@@ -185,6 +186,11 @@ export class ClientRMQ extends ClientProxy {
callback: (packet: WritePacket) => any,
): () => void {
try {
const recordOptions = this.unwrapRecord<RmqRecordOptions>(
message,
RmqRecord,
);
const correlationId = randomStringGenerator();
const listener = ({ content }: { content: any }) =>
this.handleMessage(JSON.parse(content.toString()), callback);
@@ -198,8 +204,9 @@ export class ClientRMQ extends ClientProxy {
Buffer.from(JSON.stringify(serializedPacket)),
{
replyTo: this.replyQueue,
correlationId,
persistent: this.persistent,
...recordOptions,
correlationId,
},
);
return () => this.responseEmitter.removeListener(correlationId, listener);
@@ -209,6 +216,10 @@ export class ClientRMQ extends ClientProxy {
}
protected dispatchEvent(packet: ReadPacket): Promise<any> {
const recordOptions = this.unwrapRecord<RmqRecordOptions>(
packet,
RmqRecord,
);
const serializedPacket = this.serializer.serialize(packet);
return new Promise<void>((resolve, reject) =>
@@ -217,6 +228,7 @@ export class ClientRMQ extends ClientProxy {
Buffer.from(JSON.stringify(serializedPacket)),
{
persistent: this.persistent,
...recordOptions,
},
(err: unknown) => (err ? reject(err) : resolve()),
),

View File

@@ -0,0 +1,3 @@
export * from './mqtt.record';
export * from './nats.record';
export * from './rmq.record';

View File

@@ -0,0 +1,47 @@
import { RecordWrapper } from './record-wrapper';
export interface MqttRecordOptions {
/**
* The QoS
*/
qos?: 0 | 1 | 2;
/**
* The retain flag
*/
retain?: boolean;
/**
* Whether or not mark a message as duplicate
*/
dup?: boolean;
/*
* MQTT 5.0 properties object
*/
properties?: {
payloadFormatIndicator?: number;
messageExpiryInterval?: number;
topicAlias?: string;
responseTopic?: string;
correlationData?: Buffer;
userProperties?: Record<string, string | string[]>;
subscriptionIdentifier?: number;
contentType?: string;
};
}
export class MqttRecord<T = any> extends RecordWrapper<T, MqttRecordOptions> {
setQoS(qos: MqttRecordOptions['qos']) {
this.updateOptions({ qos });
}
setRetain(retain: MqttRecordOptions['retain']) {
this.updateOptions({ retain });
}
setDup(dup: MqttRecordOptions['dup']) {
this.updateOptions({ dup });
}
setProperties(properties: MqttRecordOptions['properties']) {
this.updateOptions({ properties });
}
}

View File

@@ -0,0 +1,11 @@
import { RecordWrapper } from './record-wrapper';
export interface NatsRecordOptions {
headers: any;
}
export class NatsRecord<T = any> extends RecordWrapper<T, NatsRecordOptions> {
setHeaders(headers: NatsRecordOptions['headers']) {
this.updateOptions({ headers });
}
}

View File

@@ -0,0 +1,16 @@
export class RecordWrapper<TData = any, TOptions = any> {
private _options?: TOptions;
get options(): TOptions {
return { ...this._options };
}
constructor(public readonly data: TData) {}
protected updateOptions(options: TOptions) {
this._options = {
...this._options,
...options,
};
}
}

View File

@@ -0,0 +1,25 @@
import { RecordWrapper } from './record-wrapper';
export interface RmqRecordOptions {
expiration?: string | number;
userId?: string;
CC?: string | string[];
mandatory?: boolean;
persistent?: boolean;
deliveryMode?: boolean | number;
BCC?: string | string[];
contentType?: string;
contentEncoding?: string;
headers?: any;
priority?: number;
messageId?: string;
timestamp?: number;
type?: string;
appId?: string;
}
export class RmqRecord<T = any> extends RecordWrapper<T, RmqRecordOptions> {
setOptions(options: RmqRecordOptions) {
this.updateOptions(options);
}
}

View File

@@ -289,13 +289,13 @@ describe('ClientMqtt', () => {
});
it('should publish packet', async () => {
publishStub.callsFake((a, b, c) => c());
publishStub.callsFake((a, b, c, d) => d());
await client['dispatchEvent'](msg);
expect(publishStub.called).to.be.true;
});
it('should throw error', async () => {
publishStub.callsFake((a, b, c) => c(new Error()));
publishStub.callsFake((a, b, c, d) => d(new Error()));
client['dispatchEvent'](msg).catch(err =>
expect(err).to.be.instanceOf(Error),
);