feat(): add rpc serializer and deserializer, improve performance

This commit is contained in:
Kamil Myśliwiec
2019-07-26 13:33:59 +02:00
parent 9885309cd5
commit 7f8aa92c0a
33 changed files with 368 additions and 119 deletions

View File

@@ -45,6 +45,9 @@ export class ClientRMQ extends ClientProxy {
rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
require('amqp-connection-manager'),
);
this.initializeSerializer(options);
this.initializeDeserializer(options);
}
public close(): void {
@@ -132,14 +135,14 @@ export class ClientRMQ extends ClientProxy {
}
public handleMessage(
packet: WritePacket,
packet: unknown,
callback: (packet: WritePacket) => any,
) {
const { err, response, isDisposed } = packet;
const { err, response, isDisposed } = this.deserializer.deserialize(packet);
if (isDisposed || err) {
callback({
err,
response: null,
response,
isDisposed: true,
});
}
@@ -159,10 +162,12 @@ export class ClientRMQ extends ClientProxy {
this.handleMessage(JSON.parse(content.toString()), callback);
Object.assign(message, { id: correlationId });
const serializedPacket = this.serializer.serialize(message);
this.responseEmitter.on(correlationId, listener);
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(message)),
Buffer.from(JSON.stringify(serializedPacket)),
{
replyTo: REPLY_QUEUE,
correlationId,
@@ -175,10 +180,12 @@ export class ClientRMQ extends ClientProxy {
}
protected dispatchEvent(packet: ReadPacket): Promise<any> {
const serializedPacket = this.serializer.serialize(packet);
return new Promise((resolve, reject) =>
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(packet)),
Buffer.from(JSON.stringify(serializedPacket)),
{},
err => (err ? reject(err) : resolve()),
),