feat(microservices) add matchMqttPattern to server mqtt

This commit is contained in:
Hiep Thai
2019-09-14 11:13:37 +07:00
parent b4baa995e4
commit 52bfb856ac
4 changed files with 64 additions and 1 deletions

View File

@@ -6,11 +6,15 @@ import {
MESSAGE_EVENT,
MQTT_DEFAULT_URL,
NO_MESSAGE_HANDLER,
MQTT_SEPARATOR,
MQTT_WILDCARD_ALL,
MQTT_WILDCARD_SINGLE,
} from '../constants';
import { MqttClient } from '../external/mqtt-client.interface';
import {
CustomTransportStrategy,
IncomingRequest,
MessageHandler,
PacketId,
ReadPacket,
} from '../interfaces';
@@ -66,6 +70,46 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
return mqttPackage.connect(this.url, this.options as MqttOptions);
}
public matchMqttPattern(pattern, topic) {
const patternSegments = pattern.split(MQTT_SEPARATOR);
const topicSegments = topic.split(MQTT_SEPARATOR);
const patternLength = patternSegments.length;
const topicLength = topicSegments.length;
const lastIndex = patternLength - 1;
for (let i = 0; i < patternLength; i++) {
const currentPattern = patternSegments[i];
const patternChar = currentPattern[0];
const currentTopic = topicSegments[i];
if (!currentTopic && !currentPattern) continue;
if (!currentTopic && currentPattern !== MQTT_WILDCARD_ALL) return false;
if (patternChar === MQTT_WILDCARD_ALL) return i === lastIndex;
if (
patternChar !== MQTT_WILDCARD_SINGLE &&
currentPattern !== currentTopic
)
return false;
}
return patternLength === topicLength;
}
public getHandlerByPattern(pattern: string): MessageHandler | null {
const route = this.getRouteFromPattern(pattern);
for (const [key, value] of this.messageHandlers) {
if (this.matchMqttPattern(key, route)) {
return value;
}
}
return null;
}
public getMessageHandler(pub: MqttClient): Function {
return async (channel: string, buffer: Buffer) =>
this.handleMessage(channel, buffer, pub);