chore: mqtt updates, integration test improvements

This commit is contained in:
Kamil Myśliwiec
2024-11-27 11:04:51 +01:00
parent 74e318c930
commit cf17e4d521
5 changed files with 19 additions and 9 deletions

View File

@@ -27,6 +27,7 @@ services:
mysql:
image: mysql:8.3.0
environment:
MYSQL_ROOT_HOST: '%'
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test
ports:

View File

@@ -2,10 +2,11 @@ import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import * as request from 'supertest';
import { App } from 'supertest/types';
import { DisconnectedClientController } from '../src/disconnected.controller';
describe('Disconnected client', () => {
let server;
let server: App;
let app: INestApplication;
beforeEach(async () => {

View File

@@ -7,7 +7,7 @@ import {
} from '@nestjs/common';
import { ClientProxyFactory } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { catchError, tap } from 'rxjs/operators';
@Controller()
export class DisconnectedClientController {
@@ -31,6 +31,9 @@ export class DisconnectedClientController {
: new InternalServerErrorException(),
);
}),
tap({
error: () => client.close(),
}),
);
}
}

View File

@@ -2,7 +2,7 @@ import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs';
import { first, map, share, tap } from 'rxjs/operators';
import { ECONNREFUSED, MQTT_DEFAULT_URL } from '../constants';
import { ECONNREFUSED, ENOTFOUND, MQTT_DEFAULT_URL } from '../constants';
import { MqttEvents, MqttEventsMap, MqttStatus } from '../events/mqtt.events';
import { MqttOptions, ReadPacket, WritePacket } from '../interfaces';
import {
@@ -55,8 +55,10 @@ export class ClientMqtt extends ClientProxy<MqttEvents, MqttStatus> {
return `${pattern}/reply`;
}
public close() {
this.mqttClient && this.mqttClient.end();
public async close() {
if (this.mqttClient) {
await this.mqttClient.endAsync();
}
this.mqttClient = null;
this.connectionPromise = null;
this.pendingEventListeners = [];
@@ -113,10 +115,12 @@ export class ClientMqtt extends ClientProxy<MqttEvents, MqttStatus> {
}
public registerErrorListener(client: MqttClient) {
client.on(
MqttEventsMap.ERROR,
(err: any) => err.code !== ECONNREFUSED && this.logger.error(err),
);
client.on(MqttEventsMap.ERROR, (err: any) => {
if (err.code === ECONNREFUSED || err.code === ENOTFOUND) {
return;
}
this.logger.error(err);
});
}
public registerOfflineListener(client: MqttClient) {

View File

@@ -25,6 +25,7 @@ export const RQM_DEFAULT_NO_ASSERT = false;
export const ECONNREFUSED = 'ECONNREFUSED';
export const CONN_ERR = 'CONN_ERR';
export const EADDRINUSE = 'EADDRINUSE';
export const ENOTFOUND = 'ENOTFOUND';
export const PATTERN_METADATA = 'microservices:pattern';
export const PATTERN_EXTRAS_METADATA = 'microservices:pattern_extras';