chore: resolve conflicts

This commit is contained in:
Kamil Myśliwiec
2024-11-27 12:52:09 +01:00
4 changed files with 42 additions and 39 deletions

View File

@@ -1,5 +1,6 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { isObject } from '@nestjs/common/utils/shared.utils';
import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs';
import { first, map, share, tap } from 'rxjs/operators';
import { ECONNREFUSED, ENOTFOUND, MQTT_DEFAULT_URL } from '../constants';
@@ -222,10 +223,8 @@ export class ClientMqtt extends ClientProxy<MqttEvents, MqttStatus> {
try {
const packet = this.assignPacketId(partialPacket);
const pattern = this.normalizePattern(partialPacket.pattern);
const serializedPacket: ReadPacket & Partial<MqttRecord> =
this.serializer.serialize(packet);
const responseChannel = this.getResponsePattern(pattern);
let subscriptionsCount =
this.subscriptionsCount.get(responseChannel) || 0;
@@ -234,12 +233,17 @@ export class ClientMqtt extends ClientProxy<MqttEvents, MqttStatus> {
this.subscriptionsCount.set(responseChannel, subscriptionsCount + 1);
this.routingMap.set(packet.id, callback);
const options = serializedPacket.options;
delete serializedPacket.options;
const options =
isObject(packet?.data) && packet.data instanceof MqttRecord
? (packet.data as MqttRecord)?.options
: undefined;
delete packet?.data?.options;
const serializedPacket: string | Buffer =
this.serializer.serialize(packet);
this.mqttClient!.publish(
this.getRequestPattern(pattern),
JSON.stringify(serializedPacket),
serializedPacket,
this.mergePacketOptions(options),
);
};
@@ -265,16 +269,17 @@ export class ClientMqtt extends ClientProxy<MqttEvents, MqttStatus> {
protected dispatchEvent(packet: ReadPacket): Promise<any> {
const pattern = this.normalizePattern(packet.pattern);
const serializedPacket: ReadPacket & Partial<MqttRecord> =
this.serializer.serialize(packet);
const options = serializedPacket.options;
delete serializedPacket.options;
const options =
isObject(packet?.data) && packet.data instanceof MqttRecord
? (packet.data as MqttRecord)?.options
: undefined;
delete packet?.data?.options;
const serializedPacket: string | Buffer = this.serializer.serialize(packet);
return new Promise<void>((resolve, reject) =>
this.mqttClient!.publish(
pattern,
JSON.stringify(serializedPacket),
serializedPacket,
this.mergePacketOptions(options),
(err: any) => (err ? reject(err) : resolve()),
),

View File

@@ -2,22 +2,15 @@ import { isObject } from '@nestjs/common/utils/shared.utils';
import { ReadPacket, Serializer } from '../interfaces';
import { MqttRecord } from '../record-builders';
export class MqttRecordSerializer
implements Serializer<ReadPacket, ReadPacket & Partial<MqttRecord>>
{
serialize(packet: ReadPacket): ReadPacket & Partial<MqttRecord> {
if (
packet?.data &&
isObject(packet.data) &&
packet.data instanceof MqttRecord
) {
const record = packet.data;
return {
export class MqttRecordSerializer implements Serializer<ReadPacket, string> {
serialize(packet: ReadPacket): string {
if (isObject(packet?.data) && packet.data instanceof MqttRecord) {
const record = packet.data as MqttRecord;
return JSON.stringify({
...packet,
data: record.data,
options: record.options,
};
});
}
return packet;
return JSON.stringify(packet);
}
}

View File

@@ -1,4 +1,4 @@
import { isUndefined } from '@nestjs/common/utils/shared.utils';
import { isObject, isUndefined } from '@nestjs/common/utils/shared.utils';
import {
MQTT_DEFAULT_URL,
MQTT_SEPARATOR,
@@ -149,14 +149,18 @@ export class ServerMqtt extends Server<MqttEvents, MqttStatus> {
public getPublisher(client: MqttClient, pattern: any, id: string): any {
return (response: any) => {
Object.assign(response, { id });
const outgoingResponse: Partial<MqttRecord> =
this.serializer.serialize(response);
const options = outgoingResponse.options;
delete outgoingResponse.options;
const options =
isObject(response?.data) && response.data instanceof MqttRecord
? (response.data as MqttRecord)?.options
: {};
delete response?.data?.options;
const outgoingResponse: string | Buffer =
this.serializer.serialize(response);
return client.publish(
this.getReplyPattern(pattern),
JSON.stringify(outgoingResponse),
outgoingResponse,
options,
);
};

View File

@@ -8,7 +8,7 @@ describe('MqttRecordSerializer', () => {
instance = new MqttRecordSerializer();
});
describe('serialize', () => {
it('should parse mqtt record instance', () => {
it('should parse mqtt record instance to a string, ignoring options', () => {
const mqttMessage = new MqttRecordBuilder()
.setData({ value: 'string' })
.setQoS(1)
@@ -22,18 +22,19 @@ describe('MqttRecordSerializer', () => {
pattern: 'pattern',
data: mqttMessage,
}),
).to.deep.eq({
).to.deep.eq(
JSON.stringify({
pattern: 'pattern',
options: { qos: 1, retain: true, dup: true, properties: {} },
data: { value: 'string' },
});
}),
);
});
it('should act as an identity function if msg is not an instance of MqttRecord class', () => {
const packet = {
pattern: 'pattern',
data: { random: true },
};
expect(instance.serialize(packet)).to.eq(packet);
expect(instance.serialize(packet)).to.eq(JSON.stringify(packet));
});
});
});