Merge pull request #8045 from tuxmachine/feat/msvc-static-headers

feat(microservices): add static headers
This commit is contained in:
Kamil Mysliwiec
2021-09-28 11:42:49 +02:00
committed by GitHub
8 changed files with 383 additions and 17 deletions

View File

@@ -11,7 +11,10 @@ import {
} from '../constants';
import { MqttClient } from '../external/mqtt-client.interface';
import { MqttOptions, ReadPacket, WritePacket } from '../interfaces';
import { MqttRecord } from '../record-builders/mqtt.record-builder';
import {
MqttRecord,
MqttRecordOptions,
} from '../record-builders/mqtt.record-builder';
import { MqttRequestSerializer } from '../serializers/mqtt-request.serializer';
import { ClientProxy } from './client-proxy';
@@ -141,7 +144,7 @@ export class ClientMqtt extends ClientProxy {
this.mqttClient.publish(
this.getRequestPattern(pattern),
JSON.stringify(serializedPacket),
serializedPacket.options,
this.mergePacketOptions(serializedPacket.options),
);
};
@@ -172,7 +175,7 @@ export class ClientMqtt extends ClientProxy {
this.mqttClient.publish(
pattern,
JSON.stringify(serializedPacket),
serializedPacket.options,
this.mergePacketOptions(serializedPacket.options),
(err: any) => (err ? reject(err) : resolve()),
),
);
@@ -190,4 +193,23 @@ export class ClientMqtt extends ClientProxy {
protected initializeSerializer(options: MqttOptions['options']) {
this.serializer = options?.serializer ?? new MqttRequestSerializer();
}
protected mergePacketOptions(
requestOptions?: MqttRecordOptions,
): MqttRecordOptions | undefined {
if (!requestOptions && !this.options?.userProperties) {
return undefined;
}
return {
...requestOptions,
properties: {
...requestOptions?.properties,
userProperties: {
...this.options?.userProperties,
...requestOptions?.properties?.userProperties,
},
},
};
}
}

View File

@@ -114,9 +114,11 @@ export class ClientNats extends ClientProxy {
callback: subscriptionHandler,
});
const headers = this.mergeHeaders(serializedPacket.headers);
this.natsClient.publish(channel, serializedPacket.data, {
reply: inbox,
headers: serializedPacket.headers,
headers,
});
return () => subscription.unsubscribe();
@@ -128,11 +130,12 @@ export class ClientNats extends ClientProxy {
protected dispatchEvent(packet: ReadPacket): Promise<any> {
const pattern = this.normalizePattern(packet.pattern);
const serializedPacket: NatsRecord = this.serializer.serialize(packet);
const headers = this.mergeHeaders(serializedPacket.headers);
return new Promise<void>((resolve, reject) => {
try {
this.natsClient.publish(pattern, serializedPacket.data, {
headers: serializedPacket.headers,
headers,
});
resolve();
} catch (err) {
@@ -149,4 +152,20 @@ export class ClientNats extends ClientProxy {
this.deserializer =
options?.deserializer ?? new NatsResponseJSONDeserializer();
}
protected mergeHeaders<THeaders = any>(requestHeaders?: THeaders) {
if (!requestHeaders && !this.options?.headers) {
return undefined;
}
const headers = requestHeaders ?? natsPackage.headers();
for (const [key, value] of Object.entries(this.options?.headers || {})) {
if (!headers.has(key)) {
headers.set(key, value);
}
}
return headers;
}
}

View File

@@ -201,6 +201,7 @@ export class ClientRMQ extends ClientProxy {
replyTo: this.replyQueue,
persistent: this.persistent,
...serializedPacket.options,
headers: this.mergeHeaders(serializedPacket.options?.headers),
correlationId,
},
);
@@ -221,6 +222,7 @@ export class ClientRMQ extends ClientProxy {
{
persistent: this.persistent,
...serializedPacket.options,
headers: this.mergeHeaders(serializedPacket.options?.headers),
},
(err: unknown) => (err ? reject(err) : resolve()),
),
@@ -230,4 +232,17 @@ export class ClientRMQ extends ClientProxy {
protected initializeSerializer(options: RmqOptions['options']) {
this.serializer = options?.serializer ?? new RmqRequestSerializer();
}
protected mergeHeaders(
requestHeaders?: Record<string, string>,
): Record<string, string> | undefined {
if (!requestHeaders && !this.options?.headers) {
return undefined;
}
return {
...this.options?.headers,
...requestHeaders,
};
}
}

View File

@@ -96,12 +96,14 @@ export interface MqttOptions {
url?: string;
serializer?: Serializer;
deserializer?: Deserializer;
userProperties?: Record<string, string | string[]>;
};
}
export interface NatsOptions {
transport?: Transport.NATS;
options?: {
headers?: Record<string, string>;
authenticator?: any;
debug?: boolean;
ignoreClusterUpdates?: boolean;
@@ -156,6 +158,7 @@ export interface RmqOptions {
deserializer?: Deserializer;
replyQueue?: string;
persistent?: boolean;
headers?: Record<string, string>;
};
}

View File

@@ -8,7 +8,7 @@ export interface RmqRecordOptions {
BCC?: string | string[];
contentType?: string;
contentEncoding?: string;
headers?: any;
headers?: Record<string, string>;
priority?: number;
messageId?: string;
timestamp?: number;

View File

@@ -3,10 +3,12 @@ import { empty } from 'rxjs';
import * as sinon from 'sinon';
import { ClientMqtt } from '../../client/client-mqtt';
import { ERROR_EVENT } from '../../constants';
import { ReadPacket } from '../../interfaces';
import { MqttRecord } from '../../record-builders';
describe('ClientMqtt', () => {
const test = 'test';
const client = new ClientMqtt({});
let client: ClientMqtt = new ClientMqtt({});
describe('getRequestPattern', () => {
it(`should leave pattern as it is`, () => {
@@ -21,7 +23,7 @@ describe('ClientMqtt', () => {
});
describe('publish', () => {
const pattern = 'test';
const msg = { pattern, data: 'data' };
let msg: ReadPacket;
let subscribeSpy: sinon.SinonSpy,
publishSpy: sinon.SinonSpy,
onSpy: sinon.SinonSpy,
@@ -33,6 +35,8 @@ describe('ClientMqtt', () => {
const id = '1';
beforeEach(() => {
client = new ClientMqtt({});
msg = { pattern, data: 'data' };
subscribeSpy = sinon.spy((name, fn) => fn());
publishSpy = sinon.spy();
onSpy = sinon.spy();
@@ -110,6 +114,52 @@ describe('ClientMqtt', () => {
expect(client['routingMap'].has(id)).to.be.false;
});
});
describe('headers', () => {
it('should not generate headers if none are configured', async () => {
await client['publish'](msg, () => {});
expect(publishSpy.getCall(0).args[2]).to.be.undefined;
});
it('should send packet headers', async () => {
const requestHeaders = { '1': '123' };
msg.data = new MqttRecord('data', {
properties: { userProperties: requestHeaders },
});
await client['publish'](msg, () => {});
expect(publishSpy.getCall(0).args[2].properties.userProperties).to.eql(
requestHeaders,
);
});
it('should combine packet and static headers', async () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.userProperties = staticHeaders;
const requestHeaders = { '1': '123' };
msg.data = new MqttRecord('data', {
properties: { userProperties: requestHeaders },
});
await client['publish'](msg, () => {});
expect(publishSpy.getCall(0).args[2].properties.userProperties).to.eql({
...staticHeaders,
...requestHeaders,
});
});
it('should prefer packet headers over static headers', async () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = { 'client-id': 'override-client-id' };
msg.data = new MqttRecord('data', {
properties: { userProperties: requestHeaders },
});
await client['publish'](msg, () => {});
expect(publishSpy.getCall(0).args[2].properties.userProperties).to.eql(
requestHeaders,
);
});
});
});
describe('createResponseCallback', () => {
let callback: sinon.SinonSpy, subscription;
@@ -277,10 +327,12 @@ describe('ClientMqtt', () => {
});
});
describe('dispatchEvent', () => {
const msg = { pattern: 'pattern', data: 'data' };
let msg: ReadPacket;
let publishStub: sinon.SinonStub, mqttClient;
beforeEach(() => {
client = new ClientMqtt({});
msg = { pattern: 'pattern', data: 'data' };
publishStub = sinon.stub();
mqttClient = {
publish: publishStub,
@@ -300,5 +352,57 @@ describe('ClientMqtt', () => {
expect(err).to.be.instanceOf(Error),
);
});
describe('headers', () => {
it('should not generate headers if none are configured', async () => {
publishStub.callsFake((a, b, c, d) => d());
await client['dispatchEvent'](msg);
expect(publishStub.getCall(0).args[2]).to.be.undefined;
});
it('should send packet headers', async () => {
publishStub.callsFake((a, b, c, d) => d());
const requestHeaders = { '1': '123' };
msg.data = new MqttRecord('data', {
properties: { userProperties: requestHeaders },
});
await client['dispatchEvent'](msg);
expect(publishStub.getCall(0).args[2].properties.userProperties).to.eql(
requestHeaders,
);
});
it('should combine packet and static headers', async () => {
publishStub.callsFake((a, b, c, d) => d());
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.userProperties = staticHeaders;
const requestHeaders = { '1': '123' };
msg.data = new MqttRecord('data', {
properties: { userProperties: requestHeaders },
});
await client['dispatchEvent'](msg);
expect(publishStub.getCall(0).args[2].properties.userProperties).to.eql(
{
...staticHeaders,
...requestHeaders,
},
);
});
it('should prefer packet headers over static headers', async () => {
publishStub.callsFake((a, b, c, d) => d());
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = { 'client-id': 'override-client-id' };
msg.data = new MqttRecord('data', {
properties: { userProperties: requestHeaders },
});
await client['dispatchEvent'](msg);
expect(publishStub.getCall(0).args[2].properties.userProperties).to.eql(
requestHeaders,
);
});
});
});
});

View File

@@ -1,16 +1,16 @@
import { expect } from 'chai';
import { JSONCodec, headers as createHeaders } from 'nats';
import { headers as createHeaders, JSONCodec } from 'nats';
import * as sinon from 'sinon';
import { ClientNats } from '../../client/client-nats';
import { ReadPacket } from '../../interfaces';
import { NatsRecord } from '../../record-builders';
describe('ClientNats', () => {
const client = new ClientNats({});
let client: ClientNats;
describe('publish', () => {
let msg: ReadPacket;
const pattern = 'test';
const headers = createHeaders();
headers.set('1', '123');
const msg = { pattern, data: { headers, value: 'data' } };
const id = 3;
let subscribeSpy: sinon.SinonSpy,
@@ -23,6 +23,8 @@ describe('ClientNats', () => {
createClient: sinon.SinonStub;
beforeEach(() => {
client = new ClientNats({});
msg = { pattern, data: 'data' };
unsubscribeSpy = sinon.spy();
subscription = {
unsubscribe: unsubscribeSpy,
@@ -96,7 +98,54 @@ describe('ClientNats', () => {
expect(unsubscribeSpy.called).to.be.true;
});
});
describe('headers', () => {
it('should not generate headers if none are configured', () => {
client['publish'](msg, () => {});
expect(natsClient.publish.getCall(0).args[2].headers).to.be.undefined;
});
it('should send packet headers', () => {
const requestHeaders = createHeaders();
requestHeaders.set('1', '123');
msg.data = new NatsRecord('data', requestHeaders);
client['publish'](msg, () => {});
expect(natsClient.publish.getCall(0).args[2].headers.get('1')).to.eql(
'123',
);
});
it('should combine packet and static headers', () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = createHeaders();
requestHeaders.set('1', '123');
msg.data = new NatsRecord('data', requestHeaders);
client['publish'](msg, () => {});
expect(publishSpy.getCall(0).args[2].headers.get('client-id')).to.eql(
'some-client-id',
);
expect(publishSpy.getCall(0).args[2].headers.get('1')).to.eql('123');
});
it('should prefer packet headers over static headers', () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = createHeaders();
requestHeaders.set('client-id', 'override-client-id');
msg.data = new NatsRecord('data', requestHeaders);
client['publish'](msg, () => {});
expect(publishSpy.getCall(0).args[2].headers.get('client-id')).to.eql(
'override-client-id',
);
});
});
});
describe('createSubscriptionHandler', () => {
const pattern = 'test';
const msg = { pattern, data: 'data', id: '1' };
@@ -279,10 +328,12 @@ describe('ClientNats', () => {
});
});
describe('dispatchEvent', () => {
const msg = { pattern: 'pattern', data: 'data' };
let msg: ReadPacket;
let subscribeStub: sinon.SinonStub, natsClient: any;
beforeEach(() => {
client = new ClientNats({});
msg = { pattern: 'pattern', data: 'data' };
subscribeStub = sinon
.stub()
.callsFake((channel, options) => options.callback());
@@ -298,6 +349,7 @@ describe('ClientNats', () => {
expect(natsClient.publish.called).to.be.true;
});
it('should throw error', async () => {
subscribeStub.callsFake((channel, options) =>
options.callback(new Error()),
@@ -306,5 +358,54 @@ describe('ClientNats', () => {
expect(err).to.be.instanceOf(Error),
);
});
describe('headers', () => {
it('should not generate headers if none are configured', () => {
client['dispatchEvent'](msg);
expect(natsClient.publish.getCall(0).args[2].headers).to.be.undefined;
});
it('should send packet headers', () => {
const requestHeaders = createHeaders();
requestHeaders.set('1', '123');
msg.data = new NatsRecord('data', requestHeaders);
client['dispatchEvent'](msg);
expect(natsClient.publish.getCall(0).args[2].headers.get('1')).to.eql(
'123',
);
});
it('should combine packet and static headers', () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = createHeaders();
requestHeaders.set('1', '123');
msg.data = new NatsRecord('data', requestHeaders);
client['dispatchEvent'](msg);
expect(
natsClient.publish.getCall(0).args[2].headers.get('client-id'),
).to.eql('some-client-id');
expect(natsClient.publish.getCall(0).args[2].headers.get('1')).to.eql(
'123',
);
});
it('should prefer packet headers over static headers', () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = createHeaders();
requestHeaders.set('client-id', 'override-client-id');
msg.data = new NatsRecord('data', requestHeaders);
client['dispatchEvent'](msg);
expect(
natsClient.publish.getCall(0).args[2].headers.get('client-id'),
).to.eql('override-client-id');
});
});
});
});

View File

@@ -3,6 +3,8 @@ import { EventEmitter } from 'events';
import { empty } from 'rxjs';
import * as sinon from 'sinon';
import { ClientRMQ } from '../../client/client-rmq';
import { ReadPacket } from '../../interfaces';
import { RmqRecord } from '../../record-builders';
describe('ClientRMQ', function () {
this.retries(10);
@@ -169,12 +171,14 @@ describe('ClientRMQ', function () {
describe('publish', () => {
const pattern = 'test';
const msg = { pattern, data: 'data' };
let msg: ReadPacket;
let connectSpy: sinon.SinonSpy,
sendToQueueSpy: sinon.SinonSpy,
eventSpy: sinon.SinonSpy;
beforeEach(() => {
client = new ClientRMQ({});
msg = { pattern, data: 'data' };
connectSpy = sinon.spy(client, 'connect');
eventSpy = sinon.spy();
sendToQueueSpy = sinon.spy();
@@ -223,6 +227,54 @@ describe('ClientRMQ', function () {
expect(unsubscribeSpy.called).to.be.true;
});
});
describe('headers', () => {
it('should not generate headers if none are configured', () => {
client['publish'](msg, () => {
expect(sendToQueueSpy.getCall(0).args[2].headers).to.be.undefined;
});
});
it('should send packet headers', () => {
const requestHeaders = { '1': '123' };
msg.data = new RmqRecord('data', { headers: requestHeaders });
client['publish'](msg, () => {
expect(sendToQueueSpy.getCall(0).args[2].headers).to.eql(
requestHeaders,
);
});
});
it('should combine packet and static headers', () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = { '1': '123' };
msg.data = new RmqRecord('data', { headers: requestHeaders });
client['publish'](msg, () => {
expect(sendToQueueSpy.getCall(0).args[2].headers).to.eql({
...staticHeaders,
...requestHeaders,
});
});
});
it('should prefer packet headers over static headers', () => {
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = { 'client-id': 'override-client-id' };
msg.data = new RmqRecord('data', { headers: requestHeaders });
client['publish'](msg, () => {
expect(sendToQueueSpy.getCall(0).args[2].headers).to.eql(
requestHeaders,
);
});
});
});
});
describe('handleMessage', () => {
@@ -313,10 +365,12 @@ describe('ClientRMQ', function () {
});
});
describe('dispatchEvent', () => {
const msg = { pattern: 'pattern', data: 'data' };
let msg: ReadPacket;
let sendToQueueStub: sinon.SinonStub, channel;
beforeEach(() => {
client = new ClientRMQ({});
msg = { pattern: 'pattern', data: 'data' };
sendToQueueStub = sinon.stub();
channel = {
sendToQueue: sendToQueueStub,
@@ -336,5 +390,53 @@ describe('ClientRMQ', function () {
expect(err).to.be.instanceOf(Error),
);
});
describe('headers', () => {
it('should not generate headers if none are configured', async () => {
sendToQueueStub.callsFake((a, b, c, d) => d());
await client['dispatchEvent'](msg);
expect(sendToQueueStub.getCall(0).args[2].headers).to.be.undefined;
});
it('should send packet headers', async () => {
sendToQueueStub.callsFake((a, b, c, d) => d());
const requestHeaders = { '1': '123' };
msg.data = new RmqRecord('data', { headers: requestHeaders });
await client['dispatchEvent'](msg);
expect(sendToQueueStub.getCall(0).args[2].headers).to.eql(
requestHeaders,
);
});
it('should combine packet and static headers', async () => {
sendToQueueStub.callsFake((a, b, c, d) => d());
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = { '1': '123' };
msg.data = new RmqRecord('data', { headers: requestHeaders });
await client['dispatchEvent'](msg);
expect(sendToQueueStub.getCall(0).args[2].headers).to.eql({
...staticHeaders,
...requestHeaders,
});
});
it('should prefer packet headers over static headers', async () => {
sendToQueueStub.callsFake((a, b, c, d) => d());
const staticHeaders = { 'client-id': 'some-client-id' };
(client as any).options.headers = staticHeaders;
const requestHeaders = { 'client-id': 'override-client-id' };
msg.data = new RmqRecord('data', { headers: requestHeaders });
await client['dispatchEvent'](msg);
expect(sendToQueueStub.getCall(0).args[2].headers).to.eql(
requestHeaders,
);
});
});
});
});