test(microservices): RMQ Client and Server queue bind

Tests for new fanout exchange functional

FIXES: #15746
This commit is contained in:
Serafim Gerasimov
2025-10-07 22:52:40 +03:00
parent 5238114a0e
commit bf59ad2ecc
2 changed files with 47 additions and 1 deletions

View File

@@ -119,11 +119,13 @@ describe('ClientRMQ', function () {
describe('setupChannel', () => {
const queue = 'test';
const exchange = 'test.exchange';
const queueOptions = {};
const isGlobalPrefetchCount = true;
const prefetchCount = 10;
let consumeStub: sinon.SinonStub;
let bindQueueStub: sinon.SinonStub;
let channel: any = {};
beforeEach(() => {
@@ -134,7 +136,9 @@ describe('ClientRMQ', function () {
channel = {
assertQueue: sinon.spy(() => ({})),
prefetch: sinon.spy(),
bindQueue: bindQueueStub,
};
bindQueueStub = sinon.stub();
consumeStub = sinon.stub(client, 'consumeChannel').callsFake(() => null!);
});
afterEach(() => {
@@ -146,6 +150,12 @@ describe('ClientRMQ', function () {
await client.setupChannel(channel, () => null);
expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true;
});
it('should call "bindQueue" with exchangeType is fanout', async () => {
untypedClient['options']['exchangeType'] = 'fanout';
untypedClient['options']['exchange'] = exchange;
await client.setupChannel(channel, () => null);
expect(channel.bindQueue.calledWith(queue, exchange, '')).to.be.true;
});
it('should not call "assertQueue" when noAssert is true', async () => {
client['noAssert'] = true;
@@ -183,9 +193,11 @@ describe('ClientRMQ', function () {
describe('publish', () => {
const pattern = 'test';
const exchange = 'test.exchange';
let msg: ReadPacket;
let connectSpy: sinon.SinonSpy,
sendToQueueStub: sinon.SinonStub,
publishStub: sinon.SinonStub,
eventSpy: sinon.SinonSpy;
beforeEach(() => {
@@ -196,9 +208,11 @@ describe('ClientRMQ', function () {
connectSpy = sinon.spy(client, 'connect');
eventSpy = sinon.spy();
sendToQueueStub = sinon.stub().callsFake(() => ({ catch: sinon.spy() }));
publishStub = sinon.stub().callsFake(() => ({ catch: sinon.spy() }));
client['channel'] = {
sendToQueue: sendToQueueStub,
publish: publishStub,
};
client['responseEmitter'] = new EventEmitter();
client['responseEmitter'].on(pattern, eventSpy);
@@ -214,6 +228,14 @@ describe('ClientRMQ', function () {
expect(sendToQueueStub.getCall(0).args[0]).to.be.eql(client['queue']);
});
});
it('should send message to exchange when exchangeType is fanout', async () => {
untypedClient['options']['exchangeType'] = 'fanout';
untypedClient['options']['exchange'] = exchange;
client['publish'](msg, () => {
expect(publishStub.called).to.be.true;
expect(publishStub.getCall(0).args[0]).to.be.eql(exchange);
});
});
it('should send buffer from stringified message', () => {
client['publish'](msg, () => {
@@ -380,7 +402,8 @@ describe('ClientRMQ', function () {
});
describe('dispatchEvent', () => {
let msg: ReadPacket;
let sendToQueueStub: sinon.SinonStub, channel;
const exchange = 'test.exchange';
let sendToQueueStub: sinon.SinonStub, publishStub: sinon.SinonStub, channel;
beforeEach(() => {
client = new ClientRMQ({});
@@ -388,8 +411,10 @@ describe('ClientRMQ', function () {
msg = { pattern: 'pattern', data: 'data' };
sendToQueueStub = sinon.stub();
publishStub = sinon.stub();
channel = {
sendToQueue: sendToQueueStub,
publish: publishStub,
};
untypedClient.channel = channel;
});
@@ -400,6 +425,15 @@ describe('ClientRMQ', function () {
expect(sendToQueueStub.called).to.be.true;
});
it('should publish packet to exchange when exchangeType is fanout', async () => {
untypedClient['options']['exchangeType'] = 'fanout';
untypedClient['options']['exchange'] = exchange;
publishStub.callsFake((a, b, c, d, f) => f());
await client['dispatchEvent'](msg);
expect(publishStub.called).to.be.true;
expect(publishStub.getCall(0).args[0]).to.be.eql(exchange);
});
it('should throw error', async () => {
sendToQueueStub.callsFake((a, b, c, d) => d(new Error()));
client['dispatchEvent'](msg).catch(err =>

View File

@@ -179,6 +179,7 @@ describe('ServerRMQ', () => {
});
describe('setupChannel', () => {
const queue = 'test';
const exchange = 'test.exchange';
const queueOptions = {};
const isGlobalPrefetchCount = true;
const prefetchCount = 10;
@@ -197,6 +198,8 @@ describe('ServerRMQ', () => {
assertQueue: sinon.spy(() => ({})),
prefetch: sinon.spy(),
consume: sinon.spy(),
assertExchange: sinon.spy(() => ({})),
bindQueue: sinon.spy(),
};
});
it('should call "assertQueue" with queue and queue options when noAssert is false', async () => {
@@ -214,6 +217,15 @@ describe('ServerRMQ', () => {
await server.setupChannel(channel, () => null);
expect(channel.assertQueue.called).not.to.be.true;
});
it('should call "bindQueue" with exchangeType is fanout', async () => {
server['options' as any] = {
...(server as any)['options'],
exchangeType: 'fanout',
exchange: exchange,
};
await server.setupChannel(channel, () => null);
expect(channel.bindQueue.calledWith(queue, exchange, '')).to.be.true;
});
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
await server.setupChannel(channel, () => null);
expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))