From c3ed27dab6d8d34f6a4245893e333d52b505ea60 Mon Sep 17 00:00:00 2001 From: Serafim Gerasimov Date: Tue, 14 Oct 2025 21:06:47 +0300 Subject: [PATCH] test(microservices): Integration test for the fanout exchange --- .../e2e/fanout-exchange-rmq.spec.ts | 46 +++++++++++++++++++ ...fanout-exchange-consumer-rmq.controller.ts | 12 +++++ ...fanout-exchange-producer-rmq.controller.ts | 28 +++++++++++ 3 files changed, 86 insertions(+) create mode 100644 integration/microservices/e2e/fanout-exchange-rmq.spec.ts create mode 100644 integration/microservices/src/rmq/fanout-exchange-consumer-rmq.controller.ts create mode 100644 integration/microservices/src/rmq/fanout-exchange-producer-rmq.controller.ts diff --git a/integration/microservices/e2e/fanout-exchange-rmq.spec.ts b/integration/microservices/e2e/fanout-exchange-rmq.spec.ts new file mode 100644 index 000000000..c271f88bd --- /dev/null +++ b/integration/microservices/e2e/fanout-exchange-rmq.spec.ts @@ -0,0 +1,46 @@ +import { INestApplication, INestMicroservice } from '@nestjs/common'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; +import { Test } from '@nestjs/testing'; +import * as request from 'supertest'; +import { RMQFanoutExchangeProducerController } from '../src/rmq/fanout-exchange-producer-rmq.controller'; +import { RMQFanoutExchangeConsumerController } from '../src/rmq/fanout-exchange-consumer-rmq.controller'; + +describe('RabbitMQ transport (Fanout Exchange)', () => { + let server: any; + let appProducer: INestApplication; + let appConsumer: INestMicroservice; + + beforeEach(async () => { + const producerModule = await Test.createTestingModule({ + controllers: [RMQFanoutExchangeProducerController], + }).compile(); + const consumerModule = await Test.createTestingModule({ + controllers: [RMQFanoutExchangeConsumerController], + }).compile(); + + appProducer = producerModule.createNestApplication(); + server = appProducer.getHttpAdapter().getInstance(); + + appConsumer = consumerModule.createNestMicroservice({ + transport: Transport.RMQ, + options: { + urls: [`amqp://0.0.0.0:5672`], + queue: '', + exchange: 'test.fanout', + exchangeType: 'fanout', + queueOptions: { + exclusive: true, + }, + }, + }); + await Promise.all([appProducer.init(), appConsumer.listen()]); + }); + + it(`should send message to fanout exchange`, async () => { + await request(server).get('/fanout-exchange').expect(200, 'ping/pong'); + }); + + afterEach(async () => { + await Promise.all([appProducer.close(), appConsumer.close()]); + }); +}); diff --git a/integration/microservices/src/rmq/fanout-exchange-consumer-rmq.controller.ts b/integration/microservices/src/rmq/fanout-exchange-consumer-rmq.controller.ts new file mode 100644 index 000000000..ebfc62e6e --- /dev/null +++ b/integration/microservices/src/rmq/fanout-exchange-consumer-rmq.controller.ts @@ -0,0 +1,12 @@ +import { Controller } from '@nestjs/common'; +import { Ctx, MessagePattern, RmqContext } from '@nestjs/microservices'; + +@Controller() +export class RMQFanoutExchangeConsumerController { + constructor() {} + + @MessagePattern('ping') + handleTopicExchange(@Ctx() ctx: RmqContext): string { + return ctx.getPattern() + '/pong'; + } +} diff --git a/integration/microservices/src/rmq/fanout-exchange-producer-rmq.controller.ts b/integration/microservices/src/rmq/fanout-exchange-producer-rmq.controller.ts new file mode 100644 index 000000000..78c194691 --- /dev/null +++ b/integration/microservices/src/rmq/fanout-exchange-producer-rmq.controller.ts @@ -0,0 +1,28 @@ +import { Controller, Get } from '@nestjs/common'; +import { + ClientProxy, + ClientProxyFactory, + Transport, +} from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +@Controller() +export class RMQFanoutExchangeProducerController { + client: ClientProxy; + + constructor() { + this.client = ClientProxyFactory.create({ + transport: Transport.RMQ, + options: { + urls: [`amqp://localhost:5672`], + exchange: 'test.fanout', + exchangeType: 'fanout', + }, + }); + } + + @Get('fanout-exchange') + async topicExchange() { + return lastValueFrom(this.client.send('ping', 1)); + } +}