feature(microservices) add a few options to the nats strategy

This commit is contained in:
Kamil Myśliwiec
2018-10-06 15:02:55 +02:00
parent 1eef51345f
commit 06d0f12169
3 changed files with 25 additions and 9 deletions

View File

@@ -37,7 +37,7 @@ export interface GrpcOptions {
oneofs?: boolean;
json?: boolean;
includeDirs?: string[];
}
};
};
}
@@ -76,6 +76,9 @@ export interface NatsOptions {
maxReconnectAttempts?: number;
reconnectTimeWait?: number;
servers?: string[];
reconnect?: boolean;
pedantic?: boolean;
tls?: any;
queue?: string;
};
}

View File

@@ -40,13 +40,22 @@ export class ServerNats extends Server implements CustomTransportStrategy {
}
public bindEvents(client: Client) {
const registeredPatterns = Object.keys(this.messageHandlers);
registeredPatterns.forEach(channel => {
const queue = this.getOptionsProp<NatsOptions>(this.options, 'queue');
const subscribe = (channel: string) => {
if (queue) {
return client.subscribe(
channel,
{ queue },
this.getMessageHandler(channel, client).bind(this),
);
}
client.subscribe(
channel,
this.getMessageHandler(channel, client).bind(this),
);
});
};
const registeredPatterns = Object.keys(this.messageHandlers);
registeredPatterns.forEach(channel => subscribe(channel));
}
public close() {
@@ -64,7 +73,8 @@ export class ServerNats extends Server implements CustomTransportStrategy {
}
public getMessageHandler(channel: string, client: Client) {
return async (buffer, replyTo: string) => this.handleMessage(channel, buffer, client, replyTo);
return async (buffer, replyTo: string) =>
this.handleMessage(channel, buffer, client, replyTo);
}
public async handleMessage(
@@ -88,9 +98,12 @@ export class ServerNats extends Server implements CustomTransportStrategy {
public getPublisher(publisher: Client, replyTo: string, id: string) {
return response =>
publisher.publish(replyTo, Object.assign(response, {
id,
}) as any);
publisher.publish(
replyTo,
Object.assign(response, {
id,
}),
);
}
public handleError(stream) {

View File

@@ -18,7 +18,7 @@ export class NatsStrategy extends ServerNats {
handlers.forEach(({ key, value }) =>
client.subscribe(
value.pattern,
value.queue,
{ queue: value.queue },
this.getMessageHandler(key, client).bind(this),
),
);