mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
refactor(microservices) use more descriptive channel names
This commit is contained in:
@@ -31,12 +31,12 @@ export class ClientMqtt extends ClientProxy {
|
||||
this.initializeDeserializer(options);
|
||||
}
|
||||
|
||||
public getAckPatternName(pattern: string): string {
|
||||
return `${pattern}_ack`;
|
||||
public getRequestPattern(pattern: string): string {
|
||||
return pattern;
|
||||
}
|
||||
|
||||
public getResPatternName(pattern: string): string {
|
||||
return `${pattern}_res`;
|
||||
public getResponsePattern(pattern: string): string {
|
||||
return `${pattern}.reply`;
|
||||
}
|
||||
|
||||
public close() {
|
||||
@@ -120,7 +120,7 @@ export class ClientMqtt extends ClientProxy {
|
||||
const packet = this.assignPacketId(partialPacket);
|
||||
const pattern = this.normalizePattern(partialPacket.pattern);
|
||||
const serializedPacket = this.serializer.serialize(packet);
|
||||
const responseChannel = this.getResPatternName(pattern);
|
||||
const responseChannel = this.getResponsePattern(pattern);
|
||||
|
||||
this.mqttClient.subscribe(responseChannel, (err: any) => {
|
||||
if (err) {
|
||||
@@ -128,7 +128,7 @@ export class ClientMqtt extends ClientProxy {
|
||||
}
|
||||
this.routingMap.set(packet.id, callback);
|
||||
this.mqttClient.publish(
|
||||
this.getAckPatternName(pattern),
|
||||
this.getRequestPattern(pattern),
|
||||
JSON.stringify(serializedPacket),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -39,12 +39,12 @@ export class ClientRedis extends ClientProxy {
|
||||
this.initializeDeserializer(options);
|
||||
}
|
||||
|
||||
public getAckPatternName(pattern: string): string {
|
||||
return `${pattern}_ack`;
|
||||
public getRequestPattern(pattern: string): string {
|
||||
return pattern;
|
||||
}
|
||||
|
||||
public getResPatternName(pattern: string): string {
|
||||
return `${pattern}_res`;
|
||||
public getReplyPattern(pattern: string): string {
|
||||
return `${pattern}.reply`;
|
||||
}
|
||||
|
||||
public close() {
|
||||
@@ -151,7 +151,7 @@ export class ClientRedis extends ClientProxy {
|
||||
const packet = this.assignPacketId(partialPacket);
|
||||
const pattern = this.normalizePattern(partialPacket.pattern);
|
||||
const serializedPacket = this.serializer.serialize(packet);
|
||||
const responseChannel = this.getResPatternName(pattern);
|
||||
const responseChannel = this.getReplyPattern(pattern);
|
||||
|
||||
this.routingMap.set(packet.id, callback);
|
||||
this.subClient.subscribe(responseChannel, (err: any) => {
|
||||
@@ -159,7 +159,7 @@ export class ClientRedis extends ClientProxy {
|
||||
return;
|
||||
}
|
||||
this.pubClient.publish(
|
||||
this.getAckPatternName(pattern),
|
||||
this.getRequestPattern(pattern),
|
||||
JSON.stringify(serializedPacket),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -53,7 +53,7 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
registeredPatterns.forEach(pattern => {
|
||||
const { isEventHandler } = this.messageHandlers.get(pattern);
|
||||
mqttClient.subscribe(
|
||||
isEventHandler ? pattern : this.getAckQueueName(pattern),
|
||||
isEventHandler ? pattern : this.getRequestPattern(pattern),
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -110,7 +110,7 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
const outgoingResponse = this.serializer.serialize(response);
|
||||
|
||||
return client.publish(
|
||||
this.getResQueueName(pattern),
|
||||
this.getReplyPattern(pattern),
|
||||
JSON.stringify(outgoingResponse),
|
||||
);
|
||||
};
|
||||
@@ -124,12 +124,12 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
}
|
||||
|
||||
public getAckQueueName(pattern: string): string {
|
||||
return `${pattern}_ack`;
|
||||
public getRequestPattern(pattern: string): string {
|
||||
return pattern;
|
||||
}
|
||||
|
||||
public getResQueueName(pattern: string): string {
|
||||
return `${pattern}_res`;
|
||||
public getReplyPattern(pattern: string): string {
|
||||
return `${pattern}.reply`;
|
||||
}
|
||||
|
||||
public handleError(stream: any) {
|
||||
|
||||
@@ -56,7 +56,7 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
|
||||
subscribePatterns.forEach(pattern => {
|
||||
const { isEventHandler } = this.messageHandlers.get(pattern);
|
||||
subClient.subscribe(
|
||||
isEventHandler ? pattern : this.getAckQueueName(pattern),
|
||||
isEventHandler ? pattern : this.getRequestPattern(pattern),
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -118,7 +118,7 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
|
||||
const outgoingResponse = this.serializer.serialize(response);
|
||||
|
||||
return pub.publish(
|
||||
this.getResQueueName(pattern),
|
||||
this.getReplyPattern(pattern),
|
||||
JSON.stringify(outgoingResponse),
|
||||
);
|
||||
};
|
||||
@@ -132,12 +132,12 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
}
|
||||
|
||||
public getAckQueueName(pattern: string): string {
|
||||
return `${pattern}_ack`;
|
||||
public getRequestPattern(pattern: string): string {
|
||||
return pattern;
|
||||
}
|
||||
|
||||
public getResQueueName(pattern: string): string {
|
||||
return `${pattern}_res`;
|
||||
public getReplyPattern(pattern: string): string {
|
||||
return `${pattern}.reply`;
|
||||
}
|
||||
|
||||
public handleError(stream: any) {
|
||||
|
||||
Reference in New Issue
Block a user