mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
refactor(microservices) refactor, improvements, use es6 map
This commit is contained in:
@@ -43,7 +43,7 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
|
||||
public bindEvents(mqttClient: MqttClient) {
|
||||
mqttClient.on(MESSAGE_EVENT, this.getMessageHandler(mqttClient).bind(this));
|
||||
const registeredPatterns = Object.keys(this.messageHandlers);
|
||||
const registeredPatterns = [...this.messageHandlers.keys()];
|
||||
registeredPatterns.forEach(pattern =>
|
||||
mqttClient.subscribe(this.getAckQueueName(pattern)),
|
||||
);
|
||||
@@ -57,7 +57,7 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
return mqttPackage.connect(this.url, this.options as MqttOptions);
|
||||
}
|
||||
|
||||
public getMessageHandler(pub: MqttClient): any {
|
||||
public getMessageHandler(pub: MqttClient): Function {
|
||||
return async (channel, buffer) => this.handleMessage(channel, buffer, pub);
|
||||
}
|
||||
|
||||
@@ -69,12 +69,12 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
const packet = this.deserialize(buffer.toString());
|
||||
const pattern = channel.replace(/_ack$/, '');
|
||||
const publish = this.getPublisher(pub, pattern, packet.id);
|
||||
const status = 'error';
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
|
||||
if (!this.messageHandlers[pattern]) {
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return publish({ id: packet.id, status, err: NO_PATTERN_MESSAGE });
|
||||
}
|
||||
const handler = this.messageHandlers[pattern];
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
|
||||
Reference in New Issue
Block a user