chore(): resolve conflicts

This commit is contained in:
Kamil Myśliwiec
2021-08-25 11:13:37 +02:00
6 changed files with 214 additions and 35 deletions

View File

@@ -5,13 +5,16 @@ import { NatsResponseJSONDeserializer } from '../deserializers/nats-response-jso
import { Client, NatsMsg } from '../external/nats-client.interface';
import { NatsOptions, PacketId, ReadPacket, WritePacket } from '../interfaces';
import { NatsRecord, NatsRecordOptions } from '../records/nats.record';
import { NatsJSONSerializer } from '../serializers/nats-json.serializer';
import {
NatsRequest,
NatsRequestSerializer,
} from '../serializers/nats-request.serializer';
import { ClientProxy } from './client-proxy';
let natsPackage = {} as any;
export class ClientNats extends ClientProxy {
protected readonly logger = new Logger(ClientProxy.name);
protected readonly logger = new Logger(ClientNats.name);
protected natsClient: Client;
constructor(protected readonly options: NatsOptions['options']) {
@@ -106,7 +109,7 @@ export class ClientNats extends ClientProxy {
);
const packet = this.assignPacketId(partialPacket);
const channel = this.normalizePattern(partialPacket.pattern);
const serializedPacket = this.serializer.serialize(packet);
const serializedPacket: NatsRequest = this.serializer.serialize(packet);
const inbox = natsPackage.createInbox();
const subscriptionHandler = this.createSubscriptionHandler(
@@ -118,9 +121,9 @@ export class ClientNats extends ClientProxy {
callback: subscriptionHandler,
});
this.natsClient.publish(channel, serializedPacket, {
...recordOptions,
this.natsClient.publish(channel, serializedPacket.data, {
reply: inbox,
headers: serializedPacket.headers,
});
return () => subscription.unsubscribe();
@@ -135,11 +138,13 @@ export class ClientNats extends ClientProxy {
NatsRecord,
);
const pattern = this.normalizePattern(packet.pattern);
const serializedPacket = this.serializer.serialize(packet);
const serializedPacket: NatsRequest = this.serializer.serialize(packet);
return new Promise<void>((resolve, reject) => {
try {
this.natsClient.publish(pattern, serializedPacket, recordOptions);
this.natsClient.publish(pattern, serializedPacket.data, {
headers: serializedPacket.headers,
});
resolve();
} catch (err) {
reject(err);
@@ -148,7 +153,7 @@ export class ClientNats extends ClientProxy {
}
protected initializeSerializer(options: NatsOptions['options']) {
this.serializer = options?.serializer ?? new NatsJSONSerializer();
this.serializer = options?.serializer ?? new NatsRequestSerializer();
}
protected initializeDeserializer(options: NatsOptions['options']) {

View File

@@ -1,21 +0,0 @@
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { NatsCodec } from '../external/nats-client.interface';
import { Serializer } from '../interfaces/serializer.interface';
let natsPackage = {} as any;
export class NatsJSONSerializer implements Serializer {
private readonly jsonCodec: NatsCodec<unknown>;
constructor() {
natsPackage = loadPackage('nats', NatsJSONSerializer.name, () =>
require('nats'),
);
this.jsonCodec = natsPackage.JSONCodec();
}
serialize(value: Uint8Array) {
const json = this.jsonCodec.encode(value);
return json;
}
}

View File

@@ -0,0 +1,77 @@
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { NatsCodec } from '../external/nats-client.interface';
import { Serializer } from '../interfaces/serializer.interface';
import { MsgHdrs, headers as createHeaders } from 'nats';
import { ReadPacket } from '../interfaces';
let natsPackage = {} as any;
export interface NatsRequest {
data: Uint8Array;
headers?: MsgHdrs;
}
class NatsMessage {
constructor(
public readonly headers: MsgHdrs | undefined,
public readonly data: any,
) {}
}
export class NatsMessageBuilder<T extends any> {
private headers: MsgHdrs | undefined;
private data: T | undefined;
constructor(data: T | undefined = undefined) {
this.data = data;
}
public setHeaders(headers: MsgHdrs | undefined): NatsMessageBuilder<T> {
this.headers = headers;
return this;
}
public setPlainHeaders(
headers: Record<string, string>,
): NatsMessageBuilder<T> {
const natsHeaders = createHeaders();
for (const key in headers) {
if (headers.hasOwnProperty(key)) {
natsHeaders.set(key, headers[key]);
}
}
return this.setHeaders(natsHeaders);
}
public setData(data: T | undefined): NatsMessageBuilder<T> {
this.data = data;
return this;
}
public build(): NatsMessage {
return new NatsMessage(this.headers, this.data);
}
}
export class NatsRequestSerializer implements Serializer {
private readonly jsonCodec: NatsCodec<unknown>;
constructor() {
natsPackage = loadPackage('nats', NatsRequestSerializer.name, () =>
require('nats'),
);
this.jsonCodec = natsPackage.JSONCodec();
}
serialize(packet: ReadPacket | any): NatsRequest {
const natsMessage =
packet?.data instanceof NatsMessage
? packet.data as NatsMessage
: new NatsMessageBuilder(packet?.data).build();
return {
data: this.jsonCodec.encode({ ...packet, data: natsMessage.data }),
headers: natsMessage.headers,
};
}
}

View File

@@ -8,7 +8,10 @@ import { Client, NatsMsg } from '../external/nats-client.interface';
import { CustomTransportStrategy } from '../interfaces';
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
import { IncomingRequest } from '../interfaces/packet.interface';
import { NatsJSONSerializer } from '../serializers/nats-json.serializer';
import {
NatsRequest,
NatsRequestSerializer,
} from '../serializers/nats-request.serializer';
import { Server } from './server';
let natsPackage = {} as any;
@@ -116,8 +119,11 @@ export class ServerNats extends Server implements CustomTransportStrategy {
if (natsMsg.reply) {
return (response: any) => {
Object.assign(response, { id });
const outgoingResponse = this.serializer.serialize(response);
return natsMsg.respond(outgoingResponse);
const outgoingResponse: NatsRequest =
this.serializer.serialize(response);
return natsMsg.respond(outgoingResponse.data, {
headers: outgoingResponse.headers,
});
};
}
@@ -149,7 +155,7 @@ export class ServerNats extends Server implements CustomTransportStrategy {
}
protected initializeSerializer(options: NatsOptions['options']) {
this.serializer = options?.serializer ?? new NatsJSONSerializer();
this.serializer = options?.serializer ?? new NatsRequestSerializer();
}
protected initializeDeserializer(options: NatsOptions['options']) {

View File

@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { JSONCodec } from 'nats';
import { JSONCodec, headers as createHeaders } from 'nats';
import * as sinon from 'sinon';
import { ClientNats } from '../../client/client-nats';
@@ -8,7 +8,9 @@ describe('ClientNats', () => {
describe('publish', () => {
const pattern = 'test';
const msg = { pattern, data: 'data' };
const headers = createHeaders();
headers.set('1', '123');
const msg = { pattern, data: { headers, value: 'data' } };
const id = 3;
let subscribeSpy: sinon.SinonSpy,

View File

@@ -0,0 +1,110 @@
import { expect } from 'chai';
import {
NatsMessageBuilder,
NatsRequestSerializer,
} from '../../serializers/nats-request.serializer';
import * as nats from 'nats';
const jsonCodec = nats.JSONCodec();
describe('NatsRequestSerializer', () => {
let instance: NatsRequestSerializer;
beforeEach(() => {
instance = new NatsRequestSerializer();
});
describe('serialize', () => {
it('undefined', () => {
expect(instance.serialize({ data: undefined })).to.deep.eq({
headers: undefined,
data: jsonCodec.encode({ data: undefined }),
});
});
it('null', () => {
expect(instance.serialize({ data: null })).to.deep.eq({
headers: undefined,
data: jsonCodec.encode({ data: null }),
});
});
it('string', () => {
expect(instance.serialize({ data: 'string' })).to.deep.eq({
headers: undefined,
data: jsonCodec.encode({ data: 'string' }),
});
});
it('number', () => {
expect(instance.serialize({ data: 12345 })).to.deep.eq({
headers: undefined,
data: jsonCodec.encode({ data: 12345 }),
});
});
it('buffer', () => {
expect(instance.serialize({ data: Buffer.from('buffer') })).to.deep.eq({
headers: undefined,
data: jsonCodec.encode({ data: Buffer.from('buffer') }),
});
});
it('array', () => {
expect(instance.serialize({ data: [1, 2, 3, 4, 5] })).to.deep.eq({
headers: undefined,
data: jsonCodec.encode({ data: [1, 2, 3, 4, 5] }),
});
});
it('object', () => {
const serObject = { prop: 'value' };
expect(instance.serialize({ data: serObject })).to.deep.eq({
headers: undefined,
data: jsonCodec.encode({ data: serObject }),
});
});
});
describe('serialize nats message', () => {
it('nats message with data and nats headers', () => {
const natsHeaders = nats.headers();
natsHeaders.set('1', 'header_1');
const natsMessage = new NatsMessageBuilder()
.setHeaders(natsHeaders)
.setData({ value: 'string' })
.build();
expect(
instance.serialize({
data: natsMessage,
}),
).to.deep.eq({
headers: natsHeaders,
data: jsonCodec.encode({
data: {
value: 'string',
},
}),
});
});
it('nats message with data and plain headers', () => {
const natsHeaders = nats.headers();
natsHeaders.set('1', 'header_1');
const natsMessage = new NatsMessageBuilder()
.setPlainHeaders({ '1': 'header_1' })
.setData({ value: 'string' })
.build();
expect(
instance.serialize({
data: natsMessage,
}),
).to.deep.eq({
headers: natsHeaders,
data: jsonCodec.encode({
data: {
value: 'string',
},
}),
});
});
});
});