feat() add rpc & ws decorators, add rpc context

This commit is contained in:
Kamil Myśliwiec
2019-09-26 12:44:44 +02:00
parent 0953dd4ec4
commit 1d27dacb2a
79 changed files with 1345 additions and 349 deletions

View File

@@ -7,6 +7,7 @@ import {
MQTT_DEFAULT_URL,
NO_MESSAGE_HANDLER,
} from '../constants';
import { MqttContext } from '../ctx-host/mqtt.context';
import { MqttClient } from '../external/mqtt-client.interface';
import {
CustomTransportStrategy,
@@ -67,19 +68,24 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
}
public getMessageHandler(pub: MqttClient): Function {
return async (channel: string, buffer: Buffer) =>
this.handleMessage(channel, buffer, pub);
return async (
channel: string,
buffer: Buffer,
originalPacket?: Record<string, any>,
) => this.handleMessage(channel, buffer, pub, originalPacket);
}
public async handleMessage(
channel: string,
buffer: Buffer,
pub: MqttClient,
originalPacket?: Record<string, any>,
): Promise<any> {
const rawPacket = this.parseMessage(buffer.toString());
const packet = this.deserializer.deserialize(rawPacket, { channel });
const mqttContext = new MqttContext([channel, originalPacket]);
if (isUndefined((packet as IncomingRequest).id)) {
return this.handleEvent(channel, packet);
return this.handleEvent(channel, packet, mqttContext);
}
const pattern = channel.replace(/_ack$/, '');
const publish = this.getPublisher(
@@ -99,7 +105,7 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
return publish(noHandlerPacket);
}
const response$ = this.transformToObservable(
await handler(packet.data),
await handler(packet.data, mqttContext),
) as Observable<any>;
response$ && this.send(response$, publish);
}