chore(microservices): Add persistent property to RmqOptions

Adding the persistent property to RmqOptions so messages do not get lost should the RabbitMQ broker restart.
This commit is contained in:
Ashley Meadows
2020-10-22 20:02:38 +01:00
parent 948226eba7
commit 491d1934ad
3 changed files with 10 additions and 1 deletions

View File

@@ -10,6 +10,7 @@ import {
ERROR_EVENT,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_NOACK,
RQM_DEFAULT_PERSISTENT,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
@@ -32,6 +33,7 @@ export class ClientRMQ extends ClientProxy {
protected queueOptions: any;
protected responseEmitter: EventEmitter;
protected replyQueue: string;
protected persistent: boolean;
constructor(protected readonly options: RmqOptions['options']) {
super();
@@ -43,6 +45,8 @@ export class ClientRMQ extends ClientProxy {
RQM_DEFAULT_QUEUE_OPTIONS;
this.replyQueue =
this.getOptionsProp(this.options, 'replyQueue') || REPLY_QUEUE;
this.persistent =
this.getOptionsProp(this.options, 'persistent') || RQM_DEFAULT_PERSISTENT;
loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
require('amqp-connection-manager'),
@@ -187,6 +191,7 @@ export class ClientRMQ extends ClientProxy {
{
replyTo: this.replyQueue,
correlationId,
persistent: this.persistent,
},
);
return () => this.responseEmitter.removeListener(correlationId, listener);
@@ -202,7 +207,9 @@ export class ClientRMQ extends ClientProxy {
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(serializedPacket)),
{},
{
persistent: this.persistent,
},
err => (err ? reject(err) : resolve()),
),
);