fix(): resolve merge conflicts

This commit is contained in:
Kamil Myśliwiec
2020-02-20 13:08:39 +01:00
854 changed files with 335677 additions and 14243 deletions

View File

@@ -6,6 +6,7 @@ import {
NATS_DEFAULT_URL,
NO_MESSAGE_HANDLER,
} from '../constants';
import { NatsContext } from '../ctx-host/nats.context';
import { Client } from '../external/nats-client.interface';
import { CustomTransportStrategy, PacketId } from '../interfaces';
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
@@ -75,8 +76,11 @@ export class ServerNats extends Server implements CustomTransportStrategy {
}
public getMessageHandler(channel: string, client: Client): Function {
return async (buffer: ReadPacket & PacketId, replyTo: string) =>
this.handleMessage(channel, buffer, client, replyTo);
return async (
buffer: ReadPacket & PacketId,
replyTo: string,
callerSubject: string,
) => this.handleMessage(channel, buffer, client, replyTo, callerSubject);
}
public async handleMessage(
@@ -84,10 +88,15 @@ export class ServerNats extends Server implements CustomTransportStrategy {
rawMessage: any,
client: Client,
replyTo: string,
callerSubject: string,
) {
const message = this.deserializer.deserialize(rawMessage, { channel });
const natsCtx = new NatsContext([callerSubject]);
const message = this.deserializer.deserialize(rawMessage, {
channel,
replyTo,
});
if (isUndefined((message as IncomingRequest).id)) {
return this.handleEvent(channel, message);
return this.handleEvent(channel, message, natsCtx);
}
const publish = this.getPublisher(
client,
@@ -105,7 +114,7 @@ export class ServerNats extends Server implements CustomTransportStrategy {
return publish(noHandlerPacket);
}
const response$ = this.transformToObservable(
await handler(message.data),
await handler(message.data, natsCtx),
) as Observable<any>;
response$ && this.send(response$, publish);
}