fix(microservices): multiple rmq client urls issue #9364

This commit is contained in:
Kamil Myśliwiec
2022-04-01 16:51:46 +02:00
parent 43a0514edd
commit af0e4001ff
3 changed files with 42 additions and 2 deletions

View File

@@ -77,6 +77,13 @@ describe('RabbitMQ transport', () => {
.expect(200, '15'); .expect(200, '15');
}); });
it(`/POST (multiple-urls)`, () => {
return request(server)
.post('/multiple-urls')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
}).timeout(10000);
it(`/POST (event notification)`, done => { it(`/POST (event notification)`, done => {
request(server) request(server)
.post('/notify') .post('/notify')

View File

@@ -61,6 +61,21 @@ export class RMQController {
.reduce(async (a, b) => (await a) && b); .reduce(async (a, b) => (await a) && b);
} }
@Post('multiple-urls')
@HttpCode(200)
multipleUrls(@Body() data: number[]) {
const clientWithMultipleUrls = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5671`, `amqp://localhost:5672`],
queue: 'test',
queueOptions: { durable: false },
socketOptions: { noDelay: true },
},
});
return clientWithMultipleUrls.send<number>({ cmd: 'multiple-urls' }, data);
}
@Post('record-builder-duplex') @Post('record-builder-duplex')
@HttpCode(200) @HttpCode(200)
useRecordBuilderDuplex(@Body() data: Record<string, any>) { useRecordBuilderDuplex(@Body() data: Record<string, any>) {
@@ -109,6 +124,11 @@ export class RMQController {
return from(data); return from(data);
} }
@MessagePattern({ cmd: 'multiple-urls' })
handleMultipleUrls(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
@Post('notify') @Post('notify')
async sendNotification(): Promise<any> { async sendNotification(): Promise<any> {
return this.client.emit<number>('notification', true); return this.client.emit<number>('notification', true);

View File

@@ -3,7 +3,7 @@ import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs'; import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs';
import { first, map, share, switchMap } from 'rxjs/operators'; import { first, map, retryWhen, scan, share, switchMap } from 'rxjs/operators';
import { import {
CONNECT_FAILED_EVENT, CONNECT_FAILED_EVENT,
DISCONNECTED_RMQ_MESSAGE, DISCONNECTED_RMQ_MESSAGE,
@@ -118,7 +118,20 @@ export class ClientRMQ extends ClientProxy {
}), }),
); );
const disconnect$ = eventToError(DISCONNECT_EVENT); const disconnect$ = eventToError(DISCONNECT_EVENT);
const connectFailed$ = eventToError(CONNECT_FAILED_EVENT);
const urls = this.getOptionsProp(this.options, 'urls');
const connectFailed$ = eventToError(CONNECT_FAILED_EVENT).pipe(
retryWhen(e =>
e.pipe(
scan((errorCount, error: any) => {
if (urls.indexOf(error.url) >= urls.length - 1) {
throw error;
}
return errorCount + 1;
}, 0),
),
),
);
return merge(source$, disconnect$, connectFailed$).pipe(first()); return merge(source$, disconnect$, connectFailed$).pipe(first());
} }