fix(microservices): pass options to rmq deserialize

This commit is contained in:
bangbang93
2022-07-26 15:46:18 +08:00
parent b98e4d2b1f
commit bfbcd7bb80
3 changed files with 20 additions and 7 deletions

View File

@@ -186,10 +186,17 @@ export class ClientRMQ extends ClientProxy {
public async handleMessage(
packet: unknown,
options: Record<string, unknown>,
callback: (packet: WritePacket) => any,
) {
if (typeof options === 'function') {
callback = options;
options = undefined;
}
const { err, response, isDisposed } = await this.deserializer.deserialize(
packet,
options,
);
if (isDisposed || err) {
callback({
@@ -210,8 +217,14 @@ export class ClientRMQ extends ClientProxy {
): () => void {
try {
const correlationId = randomStringGenerator();
const listener = ({ content }: { content: any }) =>
this.handleMessage(JSON.parse(content.toString()), callback);
const listener = ({
content,
options,
}: {
content: any;
options: Record<string, unknown>;
}) =>
this.handleMessage(JSON.parse(content.toString()), options, callback);
Object.assign(message, { id: correlationId });
const serializedPacket: ReadPacket & Partial<RmqRecord> =

View File

@@ -11,12 +11,12 @@ import {
DISCONNECTED_RMQ_MESSAGE,
NO_MESSAGE_HANDLER,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_NO_ASSERT,
RQM_DEFAULT_NOACK,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_DEFAULT_NO_ASSERT,
} from '../constants';
import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
@@ -142,7 +142,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
}
const { content, properties } = message;
const rawMessage = JSON.parse(content.toString());
const packet = await this.deserializer.deserialize(rawMessage);
const packet = await this.deserializer.deserialize(rawMessage, properties);
const pattern = isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern);

View File

@@ -290,7 +290,7 @@ describe('ClientRMQ', function () {
response: 'test',
isDisposed: false,
};
await client.handleMessage(packet, callback);
await client.handleMessage(packet, undefined, callback);
expect(
callback.calledWith({
err: packet.err,
@@ -311,7 +311,7 @@ describe('ClientRMQ', function () {
response: 'test',
isDisposed: true,
};
await client.handleMessage(packet, callback);
await client.handleMessage(packet, undefined, callback);
expect(
callback.calledWith({
err: undefined,
@@ -333,7 +333,7 @@ describe('ClientRMQ', function () {
response: 'test',
isDisposed: false,
};
await client.handleMessage(packet, callback);
await client.handleMessage(packet, undefined, callback);
expect(
callback.calledWith({
err: undefined,